mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Added support for ILB Global Access
Uses beta api when global access is enabled and ga api otherwise Deprecate the older load-balancer-type annotation Unit test to verify global access Also added tests to verify new and old load-balancer-type annotations Addressed review comments, staticcheck fixes fixed typo
This commit is contained in:
parent
bdfc8f62b4
commit
c09c8abc3c
@ -33,8 +33,11 @@ type LoadBalancerType string
|
||||
const (
|
||||
// ServiceAnnotationLoadBalancerType is annotated on a service with type LoadBalancer
|
||||
// dictates what specific kind of GCP LB should be assembled.
|
||||
// Currently, only "internal" is supported.
|
||||
ServiceAnnotationLoadBalancerType = "cloud.google.com/load-balancer-type"
|
||||
// Currently, only "Internal" is supported.
|
||||
ServiceAnnotationLoadBalancerType = "networking.gke.io/load-balancer-type"
|
||||
|
||||
// Deprecating the old-style naming of LoadBalancerType annotation
|
||||
deprecatedServiceAnnotationLoadBalancerType = "cloud.google.com/load-balancer-type"
|
||||
|
||||
// LBTypeInternal is the constant for the official internal type.
|
||||
LBTypeInternal LoadBalancerType = "Internal"
|
||||
@ -50,6 +53,11 @@ const (
|
||||
// This annotation did not correctly specify "alpha", so both annotations will be checked.
|
||||
deprecatedServiceAnnotationILBBackendShare = "cloud.google.com/load-balancer-backend-share"
|
||||
|
||||
// ServiceAnnotationILBAllowGlobalAccess is annotated on a service with "true" when users
|
||||
// want to access the Internal LoadBalancer globally, and not restricted to the region it is
|
||||
// created in.
|
||||
ServiceAnnotationILBAllowGlobalAccess = "networking.gke.io/internal-load-balancer-allow-global-access"
|
||||
|
||||
// NetworkTierAnnotationKey is annotated on a Service object to indicate which
|
||||
// network tier a GCP LB should use. The valid values are "Standard" and
|
||||
// "Premium" (default).
|
||||
@ -63,23 +71,23 @@ const (
|
||||
)
|
||||
|
||||
// GetLoadBalancerAnnotationType returns the type of GCP load balancer which should be assembled.
|
||||
func GetLoadBalancerAnnotationType(service *v1.Service) (LoadBalancerType, bool) {
|
||||
v := LoadBalancerType("")
|
||||
if service.Spec.Type != v1.ServiceTypeLoadBalancer {
|
||||
return v, false
|
||||
func GetLoadBalancerAnnotationType(service *v1.Service) LoadBalancerType {
|
||||
var lbType LoadBalancerType
|
||||
for _, ann := range []string{
|
||||
ServiceAnnotationLoadBalancerType,
|
||||
deprecatedServiceAnnotationLoadBalancerType,
|
||||
} {
|
||||
if v, ok := service.Annotations[ann]; ok {
|
||||
lbType = LoadBalancerType(v)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
l, ok := service.Annotations[ServiceAnnotationLoadBalancerType]
|
||||
v = LoadBalancerType(l)
|
||||
if !ok {
|
||||
return v, false
|
||||
}
|
||||
|
||||
switch v {
|
||||
switch lbType {
|
||||
case LBTypeInternal, deprecatedTypeInternalLowerCase:
|
||||
return LBTypeInternal, true
|
||||
return LBTypeInternal
|
||||
default:
|
||||
return v, false
|
||||
return lbType
|
||||
}
|
||||
}
|
||||
|
||||
@ -118,3 +126,16 @@ func GetServiceNetworkTier(service *v1.Service) (cloud.NetworkTier, error) {
|
||||
return cloud.NetworkTierDefault, fmt.Errorf("unsupported network tier: %q", v)
|
||||
}
|
||||
}
|
||||
|
||||
// ILBOptions represents the extra options specified when creating a
|
||||
// load balancer.
|
||||
type ILBOptions struct {
|
||||
// AllowGlobalAccess Indicates whether global access is allowed for the LoadBalancer
|
||||
AllowGlobalAccess bool
|
||||
}
|
||||
|
||||
// GetLoadBalancerAnnotationAllowGlobalAccess returns if global access is enabled
|
||||
// for the given loadbalancer service.
|
||||
func GetLoadBalancerAnnotationAllowGlobalAccess(service *v1.Service) bool {
|
||||
return service.Annotations[ServiceAnnotationILBAllowGlobalAccess] == "true"
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/filter"
|
||||
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
|
||||
computealpha "google.golang.org/api/compute/v0.alpha"
|
||||
computebeta "google.golang.org/api/compute/v0.beta"
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
)
|
||||
|
||||
@ -102,6 +103,16 @@ func (g *Cloud) GetAlphaRegionForwardingRule(name, region string) (*computealpha
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
||||
// GetBetaRegionForwardingRule returns the Beta forwarding rule by name & region.
|
||||
func (g *Cloud) GetBetaRegionForwardingRule(name, region string) (*computebeta.ForwardingRule, error) {
|
||||
ctx, cancel := cloud.ContextWithCallTimeout()
|
||||
defer cancel()
|
||||
|
||||
mc := newForwardingRuleMetricContextWithVersion("get", region, computeBetaVersion)
|
||||
v, err := g.c.BetaForwardingRules().Get(ctx, meta.RegionalKey(name, region))
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
||||
// ListRegionForwardingRules lists all RegionalForwardingRules in the project & region.
|
||||
func (g *Cloud) ListRegionForwardingRules(region string) ([]*compute.ForwardingRule, error) {
|
||||
ctx, cancel := cloud.ContextWithCallTimeout()
|
||||
@ -122,6 +133,16 @@ func (g *Cloud) ListAlphaRegionForwardingRules(region string) ([]*computealpha.F
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
||||
// ListBetaRegionForwardingRules lists all RegionalForwardingRules in the project & region.
|
||||
func (g *Cloud) ListBetaRegionForwardingRules(region string) ([]*computebeta.ForwardingRule, error) {
|
||||
ctx, cancel := cloud.ContextWithCallTimeout()
|
||||
defer cancel()
|
||||
|
||||
mc := newForwardingRuleMetricContextWithVersion("list", region, computeBetaVersion)
|
||||
v, err := g.c.BetaForwardingRules().List(ctx, region, filter.None)
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
||||
// CreateRegionForwardingRule creates and returns a
|
||||
// RegionalForwardingRule that points to the given BackendService
|
||||
func (g *Cloud) CreateRegionForwardingRule(rule *compute.ForwardingRule, region string) error {
|
||||
@ -133,7 +154,7 @@ func (g *Cloud) CreateRegionForwardingRule(rule *compute.ForwardingRule, region
|
||||
}
|
||||
|
||||
// CreateAlphaRegionForwardingRule creates and returns an Alpha
|
||||
// forwarding fule in the given region.
|
||||
// forwarding rule in the given region.
|
||||
func (g *Cloud) CreateAlphaRegionForwardingRule(rule *computealpha.ForwardingRule, region string) error {
|
||||
ctx, cancel := cloud.ContextWithCallTimeout()
|
||||
defer cancel()
|
||||
@ -142,6 +163,16 @@ func (g *Cloud) CreateAlphaRegionForwardingRule(rule *computealpha.ForwardingRul
|
||||
return mc.Observe(g.c.AlphaForwardingRules().Insert(ctx, meta.RegionalKey(rule.Name, region), rule))
|
||||
}
|
||||
|
||||
// CreateBetaRegionForwardingRule creates and returns a Beta
|
||||
// forwarding rule in the given region.
|
||||
func (g *Cloud) CreateBetaRegionForwardingRule(rule *computebeta.ForwardingRule, region string) error {
|
||||
ctx, cancel := cloud.ContextWithCallTimeout()
|
||||
defer cancel()
|
||||
|
||||
mc := newForwardingRuleMetricContextWithVersion("create", region, computeBetaVersion)
|
||||
return mc.Observe(g.c.BetaForwardingRules().Insert(ctx, meta.RegionalKey(rule.Name, region), rule))
|
||||
}
|
||||
|
||||
// DeleteRegionForwardingRule deletes the RegionalForwardingRule by name & region.
|
||||
func (g *Cloud) DeleteRegionForwardingRule(name, region string) error {
|
||||
ctx, cancel := cloud.ContextWithCallTimeout()
|
||||
|
@ -152,7 +152,11 @@ func (g *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, svc
|
||||
default:
|
||||
status, err = g.ensureExternalLoadBalancer(clusterName, clusterID, svc, existingFwdRule, nodes)
|
||||
}
|
||||
klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v): done ensuring loadbalancer. err: %v", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, err)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to EnsureLoadBalancer(%s, %s, %s, %s, %s), err: %v", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, err)
|
||||
return status, err
|
||||
}
|
||||
klog.V(4).Infof("EnsureLoadBalancer(%s, %s, %s, %s, %s): done ensuring loadbalancer.", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region)
|
||||
return status, err
|
||||
}
|
||||
|
||||
@ -199,7 +203,7 @@ func (g *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName strin
|
||||
}
|
||||
|
||||
func getSvcScheme(svc *v1.Service) cloud.LbScheme {
|
||||
if typ, ok := GetLoadBalancerAnnotationType(svc); ok && typ == LBTypeInternal {
|
||||
if t := GetLoadBalancerAnnotationType(svc); t == LBTypeInternal {
|
||||
return cloud.SchemeInternal
|
||||
}
|
||||
return cloud.SchemeExternal
|
||||
|
@ -20,11 +20,14 @@ package gce
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
|
||||
computebeta "google.golang.org/api/compute/v0.beta"
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@ -35,6 +38,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// Used to list instances in all states(RUNNING and other) - https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroups/listInstances
|
||||
allInstances = "ALL"
|
||||
)
|
||||
|
||||
@ -49,6 +53,12 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v
|
||||
return nil, fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(protocol))
|
||||
}
|
||||
scheme := cloud.SchemeInternal
|
||||
options := getILBOptions(svc)
|
||||
if g.isLegacyNetwork {
|
||||
g.eventRecorder.Event(svc, v1.EventTypeWarning, "ILBOptionsIgnored", "Internal LoadBalancer options are not supported with Legacy Networks.")
|
||||
options = ILBOptions{}
|
||||
}
|
||||
|
||||
loadBalancerName := g.GetLoadBalancerName(context.TODO(), clusterName, svc)
|
||||
sharedBackend := shareBackendService(svc)
|
||||
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, protocol, svc.Spec.SessionAffinity)
|
||||
@ -117,46 +127,31 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v
|
||||
return nil, err
|
||||
}
|
||||
|
||||
expectedFwdRule := &compute.ForwardingRule{
|
||||
Name: loadBalancerName,
|
||||
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, nm.String()),
|
||||
IPAddress: ipToUse,
|
||||
BackendService: backendServiceLink,
|
||||
Ports: ports,
|
||||
IPProtocol: string(protocol),
|
||||
LoadBalancingScheme: string(scheme),
|
||||
}
|
||||
|
||||
// Given that CreateGCECloud will attempt to determine the subnet based off the network,
|
||||
// the subnetwork should rarely be unknown.
|
||||
if subnetworkURL != "" {
|
||||
expectedFwdRule.Subnetwork = subnetworkURL
|
||||
} else {
|
||||
expectedFwdRule.Network = g.networkURL
|
||||
}
|
||||
|
||||
fwdRuleDeleted := false
|
||||
if existingFwdRule != nil && !fwdRuleEqual(existingFwdRule, expectedFwdRule) {
|
||||
klog.V(2).Infof("ensureInternalLoadBalancer(%v): deleting existing forwarding rule with IP address %v", loadBalancerName, existingFwdRule.IPAddress)
|
||||
if err = ignoreNotFound(g.DeleteRegionForwardingRule(loadBalancerName, g.region)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fwdRuleDeleted = true
|
||||
}
|
||||
|
||||
bsDescription := makeBackendServiceDescription(nm, sharedBackend)
|
||||
err = g.ensureInternalBackendService(backendServiceName, bsDescription, svc.Spec.SessionAffinity, scheme, protocol, igLinks, hc.SelfLink)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If we previously deleted the forwarding rule or it never existed, finally create it.
|
||||
if fwdRuleDeleted || existingFwdRule == nil {
|
||||
klog.V(2).Infof("ensureInternalLoadBalancer(%v): creating forwarding rule", loadBalancerName)
|
||||
if err = g.CreateRegionForwardingRule(expectedFwdRule, g.region); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
klog.V(2).Infof("ensureInternalLoadBalancer(%v): created forwarding rule", loadBalancerName)
|
||||
newFRC := &forwardingRuleComposite{
|
||||
name: loadBalancerName,
|
||||
description: &forwardingRuleDescription{ServiceName: nm.String()},
|
||||
ipAddress: ipToUse,
|
||||
backendService: backendServiceLink,
|
||||
ports: ports,
|
||||
ipProtocol: string(protocol),
|
||||
lbScheme: string(scheme),
|
||||
// Given that CreateGCECloud will attempt to determine the subnet based off the network,
|
||||
// the subnetwork should rarely be unknown.
|
||||
subnetwork: subnetworkURL,
|
||||
network: g.networkURL,
|
||||
}
|
||||
if options.AllowGlobalAccess {
|
||||
newFRC.allowGlobalAccess = options.AllowGlobalAccess
|
||||
newFRC.description.APIVersion = meta.VersionBeta
|
||||
}
|
||||
if err := g.ensureInternalForwardingRule(existingFwdRule, newFRC); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Delete the previous internal load balancer resources if necessary
|
||||
@ -702,14 +697,6 @@ func backendSvcEqual(a, b *compute.BackendService) bool {
|
||||
backendsListEqual(a.Backends, b.Backends)
|
||||
}
|
||||
|
||||
func fwdRuleEqual(a, b *compute.ForwardingRule) bool {
|
||||
return (a.IPAddress == "" || b.IPAddress == "" || a.IPAddress == b.IPAddress) &&
|
||||
a.IPProtocol == b.IPProtocol &&
|
||||
a.LoadBalancingScheme == b.LoadBalancingScheme &&
|
||||
equalStringSets(a.Ports, b.Ports) &&
|
||||
a.BackendService == b.BackendService
|
||||
}
|
||||
|
||||
func getPortsAndProtocol(svcPorts []v1.ServicePort) (ports []string, protocol v1.Protocol) {
|
||||
if len(svcPorts) == 0 {
|
||||
return []string{}, v1.ProtocolUDP
|
||||
@ -747,3 +734,205 @@ func determineRequestedIP(svc *v1.Service, fwdRule *compute.ForwardingRule) stri
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func getILBOptions(svc *v1.Service) ILBOptions {
|
||||
return ILBOptions{AllowGlobalAccess: GetLoadBalancerAnnotationAllowGlobalAccess(svc)}
|
||||
}
|
||||
|
||||
// forwardingRuleComposite is a composite type encapsulating both the GA and Beta ForwardingRules.
|
||||
// It exposes methods to compute the ForwardingRule object based on the given parameters and to compare 2 composite types
|
||||
// based on the version string.
|
||||
type forwardingRuleComposite struct {
|
||||
allowGlobalAccess bool
|
||||
name string
|
||||
description *forwardingRuleDescription
|
||||
ipAddress string
|
||||
backendService string
|
||||
ports []string
|
||||
ipProtocol string
|
||||
lbScheme string
|
||||
subnetwork string
|
||||
network string
|
||||
}
|
||||
|
||||
func (f *forwardingRuleComposite) Version() meta.Version {
|
||||
return f.description.APIVersion
|
||||
}
|
||||
|
||||
func (f *forwardingRuleComposite) Equal(other *forwardingRuleComposite) bool {
|
||||
return (f.ipAddress == "" || other.ipAddress == "" || f.ipAddress == other.ipAddress) &&
|
||||
f.ipProtocol == other.ipProtocol &&
|
||||
f.lbScheme == other.lbScheme &&
|
||||
equalStringSets(f.ports, other.ports) &&
|
||||
f.backendService == other.backendService &&
|
||||
f.allowGlobalAccess == other.allowGlobalAccess
|
||||
}
|
||||
|
||||
// ToForwardingRuleComposite converts a compute beta or GA ForwardingRule into the composite type
|
||||
func ToForwardingRuleComposite(rule interface{}) (frc *forwardingRuleComposite, err error) {
|
||||
switch fr := rule.(type) {
|
||||
case *compute.ForwardingRule:
|
||||
frc = &forwardingRuleComposite{
|
||||
name: fr.Name,
|
||||
ipAddress: fr.IPAddress,
|
||||
description: &forwardingRuleDescription{APIVersion: meta.VersionGA},
|
||||
backendService: fr.BackendService,
|
||||
ports: fr.Ports,
|
||||
ipProtocol: fr.IPProtocol,
|
||||
lbScheme: fr.LoadBalancingScheme,
|
||||
subnetwork: fr.Subnetwork,
|
||||
network: fr.Network,
|
||||
}
|
||||
if fr.Description != "" {
|
||||
err = frc.description.unmarshal(fr.Description)
|
||||
}
|
||||
return frc, err
|
||||
case *computebeta.ForwardingRule:
|
||||
frc = &forwardingRuleComposite{
|
||||
name: fr.Name,
|
||||
ipAddress: fr.IPAddress,
|
||||
description: &forwardingRuleDescription{APIVersion: meta.VersionBeta},
|
||||
backendService: fr.BackendService,
|
||||
ports: fr.Ports,
|
||||
ipProtocol: fr.IPProtocol,
|
||||
lbScheme: fr.LoadBalancingScheme,
|
||||
subnetwork: fr.Subnetwork,
|
||||
network: fr.Network,
|
||||
allowGlobalAccess: fr.AllowGlobalAccess,
|
||||
}
|
||||
if fr.Description != "" {
|
||||
err = frc.description.unmarshal(fr.Description)
|
||||
}
|
||||
return frc, err
|
||||
default:
|
||||
return nil, fmt.Errorf("Invalid object type %T to compute ForwardingRuleComposite from", fr)
|
||||
}
|
||||
}
|
||||
|
||||
// ToBeta returns a Beta ForwardingRule from the composite type.
|
||||
func (f *forwardingRuleComposite) ToBeta() (*computebeta.ForwardingRule, error) {
|
||||
descStr, err := f.description.marshal()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to compute description for beta forwarding rule %s, err: %v", f.name, err)
|
||||
}
|
||||
return &computebeta.ForwardingRule{
|
||||
Name: f.name,
|
||||
Description: descStr,
|
||||
IPAddress: f.ipAddress,
|
||||
BackendService: f.backendService,
|
||||
Ports: f.ports,
|
||||
IPProtocol: f.ipProtocol,
|
||||
LoadBalancingScheme: f.lbScheme,
|
||||
Subnetwork: f.subnetwork,
|
||||
Network: f.network,
|
||||
AllowGlobalAccess: f.allowGlobalAccess,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ToGA returns a GA ForwardingRule from the composite type.
|
||||
func (f *forwardingRuleComposite) ToGA() (*compute.ForwardingRule, error) {
|
||||
descStr, err := f.description.marshal()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to compute description for GA forwarding rule %s, err: %v", f.name, err)
|
||||
}
|
||||
return &compute.ForwardingRule{
|
||||
Name: f.name,
|
||||
Description: descStr,
|
||||
IPAddress: f.ipAddress,
|
||||
BackendService: f.backendService,
|
||||
Ports: f.ports,
|
||||
IPProtocol: f.ipProtocol,
|
||||
LoadBalancingScheme: f.lbScheme,
|
||||
Subnetwork: f.subnetwork,
|
||||
Network: f.network,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type forwardingRuleDescription struct {
|
||||
ServiceName string `json:"kubernetes.io/service-name"`
|
||||
APIVersion meta.Version `json:"kubernetes.io/api-version,omitempty"`
|
||||
}
|
||||
|
||||
// marshal the description as a JSON-encoded string.
|
||||
func (d *forwardingRuleDescription) marshal() (string, error) {
|
||||
out, err := json.Marshal(d)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(out), err
|
||||
}
|
||||
|
||||
// unmarshal desc JSON-encoded string into this structure.
|
||||
func (d *forwardingRuleDescription) unmarshal(desc string) error {
|
||||
return json.Unmarshal([]byte(desc), d)
|
||||
}
|
||||
|
||||
func getFwdRuleAPIVersion(rule *compute.ForwardingRule) (meta.Version, error) {
|
||||
d := &forwardingRuleDescription{}
|
||||
if rule.Description == "" {
|
||||
return meta.VersionGA, nil
|
||||
}
|
||||
if err := d.unmarshal(rule.Description); err != nil {
|
||||
return meta.VersionGA, fmt.Errorf("Failed to get APIVersion from Forwarding rule %s - %v", rule.Name, err)
|
||||
}
|
||||
if d.APIVersion == "" {
|
||||
d.APIVersion = meta.VersionGA
|
||||
}
|
||||
return d.APIVersion, nil
|
||||
}
|
||||
|
||||
func (g *Cloud) ensureInternalForwardingRule(existingFwdRule *compute.ForwardingRule, newFRC *forwardingRuleComposite) (err error) {
|
||||
if existingFwdRule != nil {
|
||||
version, err := getFwdRuleAPIVersion(existingFwdRule)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var oldFRC *forwardingRuleComposite
|
||||
switch version {
|
||||
case meta.VersionBeta:
|
||||
var betaRule *computebeta.ForwardingRule
|
||||
betaRule, err = g.GetBetaRegionForwardingRule(existingFwdRule.Name, g.region)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
oldFRC, err = ToForwardingRuleComposite(betaRule)
|
||||
case meta.VersionGA:
|
||||
oldFRC, err = ToForwardingRuleComposite(existingFwdRule)
|
||||
default:
|
||||
klog.Errorf("invalid version string for %s, assuming GA", existingFwdRule.Name)
|
||||
oldFRC, err = ToForwardingRuleComposite(existingFwdRule)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if oldFRC.Equal(newFRC) {
|
||||
klog.V(4).Infof("oldFRC == newFRC, no updates needed (oldFRC == %+v)", oldFRC)
|
||||
return nil
|
||||
}
|
||||
klog.V(2).Infof("ensureInternalLoadBalancer(%v): deleting existing forwarding rule with IP address %v", existingFwdRule.Name, existingFwdRule.IPAddress)
|
||||
if err = ignoreNotFound(g.DeleteRegionForwardingRule(existingFwdRule.Name, g.region)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// At this point, the existing rule has been deleted if required.
|
||||
// Create the rule based on the api version determined
|
||||
if newFRC.Version() == meta.VersionBeta {
|
||||
klog.V(2).Infof("ensureInternalLoadBalancer(%v): creating beta forwarding rule", newFRC.name)
|
||||
var betaRule *computebeta.ForwardingRule
|
||||
betaRule, err = newFRC.ToBeta()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = g.CreateBetaRegionForwardingRule(betaRule, g.region)
|
||||
} else {
|
||||
var gaRule *compute.ForwardingRule
|
||||
klog.V(2).Infof("ensureInternalLoadBalancer(%v): creating ga forwarding rule", newFRC.name)
|
||||
gaRule, err = newFRC.ToGA()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = g.CreateRegionForwardingRule(gaRule, g.region)
|
||||
}
|
||||
klog.V(2).Infof("ensureInternalLoadBalancer(%v): created forwarding rule, err : %s", newFRC.name, err)
|
||||
return err
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ package gce
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
@ -29,6 +30,7 @@ import (
|
||||
|
||||
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
|
||||
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock"
|
||||
computebeta "google.golang.org/api/compute/v0.beta"
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@ -160,6 +162,48 @@ func TestEnsureInternalLoadBalancer(t *testing.T) {
|
||||
assertInternalLbResources(t, gce, svc, vals, nodeNames)
|
||||
}
|
||||
|
||||
func TestEnsureInternalLoadBalancerDeprecatedAnnotation(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
vals := DefaultTestClusterValues()
|
||||
nodeNames := []string{"test-node-1"}
|
||||
|
||||
gce, err := fakeGCECloud(vals)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
|
||||
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
|
||||
svc := fakeLoadBalancerServiceDeprecatedAnnotation(string(LBTypeInternal))
|
||||
status, err := gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, svc, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
assertInternalLbResources(t, gce, svc, vals, nodeNames)
|
||||
|
||||
// Now add the latest annotation and change scheme to external
|
||||
svc.Annotations[ServiceAnnotationLoadBalancerType] = ""
|
||||
status, err = gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, svc, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
assertInternalLbResourcesDeleted(t, gce, svc, vals, false)
|
||||
assertExternalLbResources(t, gce, svc, vals, nodeNames)
|
||||
// Delete the service
|
||||
err = gce.EnsureLoadBalancerDeleted(context.Background(), vals.ClusterName, svc)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assertExternalLbResourcesDeleted(t, gce, svc, vals, true)
|
||||
assertInternalLbResourcesDeleted(t, gce, svc, vals, true)
|
||||
}
|
||||
|
||||
func TestEnsureInternalLoadBalancerWithExistingResources(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@ -917,3 +961,273 @@ func TestEnsureInternalLoadBalancerDeletedSubsetting(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assertInternalLbResourcesDeleted(t, gce, svc, vals, true)
|
||||
}
|
||||
|
||||
func TestEnsureInternalLoadBalancerGlobalAccess(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
vals := DefaultTestClusterValues()
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodeNames := []string{"test-node-1"}
|
||||
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
|
||||
// Change service to include the global access annotation
|
||||
svc.Annotations[ServiceAnnotationILBAllowGlobalAccess] = "true"
|
||||
status, err = gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, svc, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
betaRuleDescString := fmt.Sprintf(`{"kubernetes.io/service-name":"%s","kubernetes.io/api-version":"beta"}`, types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}.String())
|
||||
fwdRule, err := gce.GetBetaRegionForwardingRule(lbName, gce.region)
|
||||
if !fwdRule.AllowGlobalAccess {
|
||||
t.Errorf("Unexpected false value for AllowGlobalAccess")
|
||||
}
|
||||
if fwdRule.Description != betaRuleDescString {
|
||||
t.Errorf("Expected description %s, Got %s", betaRuleDescString, fwdRule.Description)
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
// remove the annotation
|
||||
delete(svc.Annotations, ServiceAnnotationILBAllowGlobalAccess)
|
||||
status, err = gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, svc, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
gaRuleDescString := fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}.String())
|
||||
fwdRule, err = gce.GetBetaRegionForwardingRule(lbName, gce.region)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
if fwdRule.AllowGlobalAccess {
|
||||
t.Errorf("Unexpected true value for AllowGlobalAccess")
|
||||
}
|
||||
if fwdRule.Description != gaRuleDescString {
|
||||
t.Errorf("Expected description %s, Got %s", gaRuleDescString, fwdRule.Description)
|
||||
}
|
||||
// Delete the service
|
||||
err = gce.EnsureLoadBalancerDeleted(context.Background(), vals.ClusterName, svc)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assertInternalLbResourcesDeleted(t, gce, svc, vals, true)
|
||||
}
|
||||
|
||||
func TestEnsureInternalLoadBalancerDisableGlobalAccess(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
vals := DefaultTestClusterValues()
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodeNames := []string{"test-node-1"}
|
||||
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc.Annotations[ServiceAnnotationILBAllowGlobalAccess] = "true"
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
fwdRule, err := gce.GetBetaRegionForwardingRule(lbName, gce.region)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
if !fwdRule.AllowGlobalAccess {
|
||||
t.Errorf("Unexpected false value for AllowGlobalAccess")
|
||||
}
|
||||
|
||||
// disable global access - setting the annotation to false or removing annotation will disable it
|
||||
svc.Annotations[ServiceAnnotationILBAllowGlobalAccess] = "false"
|
||||
status, err = gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, svc, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
fwdRule, err = gce.GetBetaRegionForwardingRule(lbName, gce.region)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
if fwdRule.AllowGlobalAccess {
|
||||
t.Errorf("Unexpected true value for AllowGlobalAccess")
|
||||
}
|
||||
|
||||
// Delete the service
|
||||
err = gce.EnsureLoadBalancerDeleted(context.Background(), vals.ClusterName, svc)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assertInternalLbResourcesDeleted(t, gce, svc, vals, true)
|
||||
}
|
||||
|
||||
func TestGlobalAccessChangeScheme(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
vals := DefaultTestClusterValues()
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodeNames := []string{"test-node-1"}
|
||||
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
// Change service to include the global access annotation
|
||||
svc.Annotations[ServiceAnnotationILBAllowGlobalAccess] = "true"
|
||||
status, err = gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, svc, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
fwdRule, err := gce.GetBetaRegionForwardingRule(lbName, gce.region)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
if !fwdRule.AllowGlobalAccess {
|
||||
t.Errorf("Unexpected false value for AllowGlobalAccess")
|
||||
}
|
||||
// change the scheme to externalLoadBalancer
|
||||
delete(svc.Annotations, ServiceAnnotationLoadBalancerType)
|
||||
status, err = gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, svc, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
// Firewall is deleted when the service is deleted
|
||||
assertInternalLbResourcesDeleted(t, gce, svc, vals, false)
|
||||
fwdRule, err = gce.GetBetaRegionForwardingRule(lbName, gce.region)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
if fwdRule.AllowGlobalAccess {
|
||||
t.Errorf("Unexpected true value for AllowGlobalAccess")
|
||||
}
|
||||
// Delete the service
|
||||
err = gce.EnsureLoadBalancerDeleted(context.Background(), vals.ClusterName, svc)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
assertExternalLbResourcesDeleted(t, gce, svc, vals, true)
|
||||
assertInternalLbResourcesDeleted(t, gce, svc, vals, true)
|
||||
}
|
||||
|
||||
func TestUnmarshalEmptyAPIVersion(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
vals := DefaultTestClusterValues()
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
|
||||
existingFwdRule := &compute.ForwardingRule{
|
||||
Name: lbName,
|
||||
IPAddress: "",
|
||||
Ports: []string{"123"},
|
||||
IPProtocol: "TCP",
|
||||
LoadBalancingScheme: string(cloud.SchemeInternal),
|
||||
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}.String()),
|
||||
}
|
||||
var version meta.Version
|
||||
version, err = getFwdRuleAPIVersion(existingFwdRule)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
if version != meta.VersionGA {
|
||||
t.Errorf("Unexpected version %s", version)
|
||||
}
|
||||
}
|
||||
|
||||
func TestForwardingRuleCompositeEqual(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
vals := DefaultTestClusterValues()
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
gaRule := &compute.ForwardingRule{
|
||||
Name: lbName,
|
||||
IPAddress: "",
|
||||
Ports: []string{"123"},
|
||||
IPProtocol: "TCP",
|
||||
LoadBalancingScheme: string(cloud.SchemeInternal),
|
||||
}
|
||||
betaRule := &computebeta.ForwardingRule{
|
||||
Name: lbName + "-beta",
|
||||
IPAddress: "",
|
||||
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s","apiVersion":"beta"}`, svc.Name),
|
||||
Ports: []string{"123"},
|
||||
IPProtocol: "TCP",
|
||||
LoadBalancingScheme: string(cloud.SchemeInternal),
|
||||
AllowGlobalAccess: false,
|
||||
}
|
||||
betaRuleGlobalAccess := &computebeta.ForwardingRule{
|
||||
Name: lbName + "-globalaccess",
|
||||
IPAddress: "",
|
||||
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s","apiVersion":"beta"}`, svc.Name),
|
||||
Ports: []string{"123"},
|
||||
IPProtocol: "TCP",
|
||||
LoadBalancingScheme: string(cloud.SchemeInternal),
|
||||
AllowGlobalAccess: true,
|
||||
}
|
||||
err = gce.CreateRegionForwardingRule(gaRule, gce.region)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
err = gce.CreateBetaRegionForwardingRule(betaRule, gce.region)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
err = gce.CreateBetaRegionForwardingRule(betaRuleGlobalAccess, gce.region)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
frcGA, err := ToForwardingRuleComposite(gaRule)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
frcBeta, err := ToForwardingRuleComposite(betaRule)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
frcBetaGlobalAccess, err := ToForwardingRuleComposite(betaRuleGlobalAccess)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
if !frcGA.Equal(frcBeta) {
|
||||
t.Errorf("Expected frcGA and frcBeta rules to be equal, got false")
|
||||
}
|
||||
if frcBeta.Equal(frcBetaGlobalAccess) {
|
||||
t.Errorf("Expected FrcBeta and FrcBetaGlobalAccess rules to be unequal, got true")
|
||||
}
|
||||
if frcGA.Equal(frcBetaGlobalAccess) {
|
||||
t.Errorf("Expected frcGA and frcBetaGlobalAccess rules to be unequal, got true")
|
||||
}
|
||||
// Enabling globalAccess in FrcBeta to make equality fail with FrcGA
|
||||
frcBeta.allowGlobalAccess = true
|
||||
if frcGA.Equal(frcBeta) {
|
||||
t.Errorf("Expected frcGA and frcBeta rules to be unequal, got true")
|
||||
}
|
||||
}
|
||||
|
@ -50,10 +50,18 @@ const (
|
||||
)
|
||||
|
||||
func fakeLoadbalancerService(lbType string) *v1.Service {
|
||||
return fakeLoadbalancerServiceHelper(lbType, ServiceAnnotationLoadBalancerType)
|
||||
}
|
||||
|
||||
func fakeLoadBalancerServiceDeprecatedAnnotation(lbType string) *v1.Service {
|
||||
return fakeLoadbalancerServiceHelper(lbType, deprecatedServiceAnnotationLoadBalancerType)
|
||||
}
|
||||
|
||||
func fakeLoadbalancerServiceHelper(lbType string, annotationKey string) *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "",
|
||||
Annotations: map[string]string{ServiceAnnotationLoadBalancerType: lbType},
|
||||
Annotations: map[string]string{annotationKey: lbType},
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
SessionAffinity: v1.ServiceAffinityClientIP,
|
||||
|
Loading…
Reference in New Issue
Block a user