mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
Azure load balancer general improvement
This commit is contained in:
parent
2cbb07a439
commit
edfb2ad552
@ -13,6 +13,7 @@ go_library(
|
||||
"azure_backoff.go",
|
||||
"azure_blobDiskController.go",
|
||||
"azure_controllerCommon.go",
|
||||
"azure_fakes.go",
|
||||
"azure_file.go",
|
||||
"azure_instance_metadata.go",
|
||||
"azure_instances.go",
|
||||
|
@ -44,13 +44,14 @@ import (
|
||||
|
||||
const (
|
||||
// CloudProviderName is the value used for the --cloud-provider flag
|
||||
CloudProviderName = "azure"
|
||||
rateLimitQPSDefault = 1.0
|
||||
rateLimitBucketDefault = 5
|
||||
backoffRetriesDefault = 6
|
||||
backoffExponentDefault = 1.5
|
||||
backoffDurationDefault = 5 // in seconds
|
||||
backoffJitterDefault = 1.0
|
||||
CloudProviderName = "azure"
|
||||
rateLimitQPSDefault = 1.0
|
||||
rateLimitBucketDefault = 5
|
||||
backoffRetriesDefault = 6
|
||||
backoffExponentDefault = 1.5
|
||||
backoffDurationDefault = 5 // in seconds
|
||||
backoffJitterDefault = 1.0
|
||||
maximumLoadBalancerRuleCount = 148 // According to Azure LB rule default limit
|
||||
)
|
||||
|
||||
// Config holds the configuration parsed from the --cloud-config flag
|
||||
@ -113,6 +114,51 @@ type Config struct {
|
||||
|
||||
// Use managed service identity for the virtual machine to access Azure ARM APIs
|
||||
UseManagedIdentityExtension bool `json:"useManagedIdentityExtension"`
|
||||
|
||||
// Maximum allowed LoadBalancer Rule Count is the limit enforced by Azure Load balancer
|
||||
MaximumLoadBalancerRuleCount int `json:"maximumLoadBalancerRuleCount"`
|
||||
}
|
||||
|
||||
type iVirtualMachinesClient interface {
|
||||
CreateOrUpdate(resourceGroupName string, VMName string, parameters compute.VirtualMachine, cancel <-chan struct{}) (<-chan compute.VirtualMachine, <-chan error)
|
||||
Get(resourceGroupName string, VMName string, expand compute.InstanceViewTypes) (result compute.VirtualMachine, err error)
|
||||
List(resourceGroupName string) (result compute.VirtualMachineListResult, err error)
|
||||
ListNextResults(lastResults compute.VirtualMachineListResult) (result compute.VirtualMachineListResult, err error)
|
||||
}
|
||||
|
||||
type iInterfacesClient interface {
|
||||
CreateOrUpdate(resourceGroupName string, networkInterfaceName string, parameters network.Interface, cancel <-chan struct{}) (<-chan network.Interface, <-chan error)
|
||||
Get(resourceGroupName string, networkInterfaceName string, expand string) (result network.Interface, err error)
|
||||
}
|
||||
|
||||
type iLoadBalancersClient interface {
|
||||
CreateOrUpdate(resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, cancel <-chan struct{}) (<-chan network.LoadBalancer, <-chan error)
|
||||
Delete(resourceGroupName string, loadBalancerName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error)
|
||||
Get(resourceGroupName string, loadBalancerName string, expand string) (result network.LoadBalancer, err error)
|
||||
List(resourceGroupName string) (result network.LoadBalancerListResult, err error)
|
||||
ListNextResults(lastResult network.LoadBalancerListResult) (result network.LoadBalancerListResult, err error)
|
||||
}
|
||||
|
||||
type iPublicIPAddressesClient interface {
|
||||
CreateOrUpdate(resourceGroupName string, publicIPAddressName string, parameters network.PublicIPAddress, cancel <-chan struct{}) (<-chan network.PublicIPAddress, <-chan error)
|
||||
Delete(resourceGroupName string, publicIPAddressName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error)
|
||||
Get(resourceGroupName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err error)
|
||||
List(resourceGroupName string) (result network.PublicIPAddressListResult, err error)
|
||||
ListNextResults(lastResults network.PublicIPAddressListResult) (result network.PublicIPAddressListResult, err error)
|
||||
}
|
||||
|
||||
type iSubnetsClient interface {
|
||||
CreateOrUpdate(resourceGroupName string, virtualNetworkName string, subnetName string, subnetParameters network.Subnet, cancel <-chan struct{}) (<-chan network.Subnet, <-chan error)
|
||||
Delete(resourceGroupName string, virtualNetworkName string, subnetName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error)
|
||||
Get(resourceGroupName string, virtualNetworkName string, subnetName string, expand string) (result network.Subnet, err error)
|
||||
List(resourceGroupName string, virtualNetworkName string) (result network.SubnetListResult, err error)
|
||||
}
|
||||
|
||||
type iSecurityGroupsClient interface {
|
||||
CreateOrUpdate(resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, cancel <-chan struct{}) (<-chan network.SecurityGroup, <-chan error)
|
||||
Delete(resourceGroupName string, networkSecurityGroupName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error)
|
||||
Get(resourceGroupName string, networkSecurityGroupName string, expand string) (result network.SecurityGroup, err error)
|
||||
List(resourceGroupName string) (result network.SecurityGroupListResult, err error)
|
||||
}
|
||||
|
||||
// Cloud holds the config and clients
|
||||
@ -120,13 +166,13 @@ type Cloud struct {
|
||||
Config
|
||||
Environment azure.Environment
|
||||
RoutesClient network.RoutesClient
|
||||
SubnetsClient network.SubnetsClient
|
||||
InterfacesClient network.InterfacesClient
|
||||
SubnetsClient iSubnetsClient
|
||||
InterfacesClient iInterfacesClient
|
||||
RouteTablesClient network.RouteTablesClient
|
||||
LoadBalancerClient network.LoadBalancersClient
|
||||
PublicIPAddressesClient network.PublicIPAddressesClient
|
||||
SecurityGroupsClient network.SecurityGroupsClient
|
||||
VirtualMachinesClient compute.VirtualMachinesClient
|
||||
LoadBalancerClient iLoadBalancersClient
|
||||
PublicIPAddressesClient iPublicIPAddressesClient
|
||||
SecurityGroupsClient iSecurityGroupsClient
|
||||
VirtualMachinesClient iVirtualMachinesClient
|
||||
StorageAccountClient storage.AccountsClient
|
||||
DisksClient disk.DisksClient
|
||||
operationPollRateLimiter flowcontrol.RateLimiter
|
||||
@ -221,11 +267,12 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
az.SubnetsClient = network.NewSubnetsClient(az.SubscriptionID)
|
||||
az.SubnetsClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
az.SubnetsClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
az.SubnetsClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&az.SubnetsClient.Client)
|
||||
subnetsClient := network.NewSubnetsClient(az.SubscriptionID)
|
||||
subnetsClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
subnetsClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
subnetsClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&subnetsClient.Client)
|
||||
az.SubnetsClient = subnetsClient
|
||||
|
||||
az.RouteTablesClient = network.NewRouteTablesClient(az.SubscriptionID)
|
||||
az.RouteTablesClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
@ -239,35 +286,40 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
|
||||
az.RoutesClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&az.RoutesClient.Client)
|
||||
|
||||
az.InterfacesClient = network.NewInterfacesClient(az.SubscriptionID)
|
||||
az.InterfacesClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
az.InterfacesClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
az.InterfacesClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&az.InterfacesClient.Client)
|
||||
interfacesClient := network.NewInterfacesClient(az.SubscriptionID)
|
||||
interfacesClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
interfacesClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
interfacesClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&interfacesClient.Client)
|
||||
az.InterfacesClient = interfacesClient
|
||||
|
||||
az.LoadBalancerClient = network.NewLoadBalancersClient(az.SubscriptionID)
|
||||
az.LoadBalancerClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
az.LoadBalancerClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
az.LoadBalancerClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&az.LoadBalancerClient.Client)
|
||||
loadBalancerClient := network.NewLoadBalancersClient(az.SubscriptionID)
|
||||
loadBalancerClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
loadBalancerClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
loadBalancerClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&loadBalancerClient.Client)
|
||||
az.LoadBalancerClient = loadBalancerClient
|
||||
|
||||
az.VirtualMachinesClient = compute.NewVirtualMachinesClient(az.SubscriptionID)
|
||||
az.VirtualMachinesClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
az.VirtualMachinesClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
az.VirtualMachinesClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&az.VirtualMachinesClient.Client)
|
||||
virtualMachinesClient := compute.NewVirtualMachinesClient(az.SubscriptionID)
|
||||
virtualMachinesClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
virtualMachinesClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
virtualMachinesClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&virtualMachinesClient.Client)
|
||||
az.VirtualMachinesClient = virtualMachinesClient
|
||||
|
||||
az.PublicIPAddressesClient = network.NewPublicIPAddressesClient(az.SubscriptionID)
|
||||
az.PublicIPAddressesClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
az.PublicIPAddressesClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
az.PublicIPAddressesClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&az.PublicIPAddressesClient.Client)
|
||||
publicIPAddressClient := network.NewPublicIPAddressesClient(az.SubscriptionID)
|
||||
publicIPAddressClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
publicIPAddressClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
publicIPAddressClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&publicIPAddressClient.Client)
|
||||
az.PublicIPAddressesClient = publicIPAddressClient
|
||||
|
||||
az.SecurityGroupsClient = network.NewSecurityGroupsClient(az.SubscriptionID)
|
||||
az.SecurityGroupsClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
az.SecurityGroupsClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
az.SecurityGroupsClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&az.SecurityGroupsClient.Client)
|
||||
securityGroupsClient := network.NewSecurityGroupsClient(az.SubscriptionID)
|
||||
securityGroupsClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
securityGroupsClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
securityGroupsClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&securityGroupsClient.Client)
|
||||
az.SecurityGroupsClient = securityGroupsClient
|
||||
|
||||
az.StorageAccountClient = storage.NewAccountsClientWithBaseURI(az.Environment.ResourceManagerEndpoint, az.SubscriptionID)
|
||||
az.StorageAccountClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
@ -327,6 +379,10 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
|
||||
|
||||
az.metadata = NewInstanceMetadata()
|
||||
|
||||
if az.MaximumLoadBalancerRuleCount == 0 {
|
||||
az.MaximumLoadBalancerRuleCount = maximumLoadBalancerRuleCount
|
||||
}
|
||||
|
||||
if err := initDiskControllers(&az); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -26,11 +26,25 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
// getorCreateRequestBackoff returns a new Backoff object steps = 1
|
||||
// This is to make sure that the requested command executes
|
||||
// at least once
|
||||
func (az *Cloud) getorCreateRequestBackoff() (resourceRequestBackoff wait.Backoff) {
|
||||
if az.CloudProviderBackoff {
|
||||
return az.resourceRequestBackoff
|
||||
}
|
||||
resourceRequestBackoff = wait.Backoff{
|
||||
Steps: 1,
|
||||
}
|
||||
|
||||
return resourceRequestBackoff
|
||||
}
|
||||
|
||||
// GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry
|
||||
func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.VirtualMachine, bool, error) {
|
||||
var machine compute.VirtualMachine
|
||||
var exists bool
|
||||
err := wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
err := wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
var retryErr error
|
||||
machine, exists, retryErr = az.getVirtualMachine(name)
|
||||
if retryErr != nil {
|
||||
@ -46,8 +60,9 @@ func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.Virtua
|
||||
// VirtualMachineClientGetWithRetry invokes az.VirtualMachinesClient.Get with exponential backoff retry
|
||||
func (az *Cloud) VirtualMachineClientGetWithRetry(resourceGroup, vmName string, types compute.InstanceViewTypes) (compute.VirtualMachine, error) {
|
||||
var machine compute.VirtualMachine
|
||||
err := wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
err := wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
var retryErr error
|
||||
az.operationPollRateLimiter.Accept()
|
||||
machine, retryErr = az.VirtualMachinesClient.Get(resourceGroup, vmName, types)
|
||||
if retryErr != nil {
|
||||
glog.Errorf("backoff: failure, will retry,err=%v", retryErr)
|
||||
@ -59,10 +74,63 @@ func (az *Cloud) VirtualMachineClientGetWithRetry(resourceGroup, vmName string,
|
||||
return machine, err
|
||||
}
|
||||
|
||||
// VirtualMachineClientListWithRetry invokes az.VirtualMachinesClient.List with exponential backoff retry
|
||||
func (az *Cloud) VirtualMachineClientListWithRetry() ([]compute.VirtualMachine, error) {
|
||||
allNodes := []compute.VirtualMachine{}
|
||||
var result compute.VirtualMachineListResult
|
||||
err := wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
var retryErr error
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachinesClient.List(%v): start", az.ResourceGroup)
|
||||
result, retryErr = az.VirtualMachinesClient.List(az.ResourceGroup)
|
||||
glog.V(10).Infof("VirtualMachinesClient.List(%v): end", az.ResourceGroup)
|
||||
if retryErr != nil {
|
||||
glog.Errorf("VirtualMachinesClient.List(%v) - backoff: failure, will retry,err=%v",
|
||||
az.ResourceGroup,
|
||||
retryErr)
|
||||
return false, retryErr
|
||||
}
|
||||
glog.V(2).Infof("VirtualMachinesClient.List(%v) - backoff: success", az.ResourceGroup)
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
appendResults := (result.Value != nil && len(*result.Value) > 0)
|
||||
for appendResults {
|
||||
allNodes = append(allNodes, *result.Value...)
|
||||
appendResults = false
|
||||
// follow the next link to get all the vms for resource group
|
||||
if result.NextLink != nil {
|
||||
err := wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
var retryErr error
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachinesClient.ListNextResults(%v): start", az.ResourceGroup)
|
||||
result, retryErr = az.VirtualMachinesClient.ListNextResults(result)
|
||||
glog.V(10).Infof("VirtualMachinesClient.ListNextResults(%v): end", az.ResourceGroup)
|
||||
if retryErr != nil {
|
||||
glog.Errorf("VirtualMachinesClient.ListNextResults(%v) - backoff: failure, will retry,err=%v",
|
||||
az.ResourceGroup, retryErr)
|
||||
return false, retryErr
|
||||
}
|
||||
glog.V(2).Infof("VirtualMachinesClient.ListNextResults(%v): success", az.ResourceGroup)
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return allNodes, err
|
||||
}
|
||||
appendResults = (result.Value != nil && len(*result.Value) > 0)
|
||||
}
|
||||
}
|
||||
|
||||
return allNodes, err
|
||||
}
|
||||
|
||||
// GetIPForMachineWithRetry invokes az.getIPForMachine with exponential backoff retry
|
||||
func (az *Cloud) GetIPForMachineWithRetry(name types.NodeName) (string, error) {
|
||||
var ip string
|
||||
err := wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
err := wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
var retryErr error
|
||||
ip, retryErr = az.getIPForMachine(name)
|
||||
if retryErr != nil {
|
||||
@ -77,7 +145,7 @@ func (az *Cloud) GetIPForMachineWithRetry(name types.NodeName) (string, error) {
|
||||
|
||||
// CreateOrUpdateSGWithRetry invokes az.SecurityGroupsClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error {
|
||||
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
return wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): start", *sg.Name)
|
||||
respChan, errChan := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil)
|
||||
@ -90,7 +158,7 @@ func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error {
|
||||
|
||||
// CreateOrUpdateLBWithRetry invokes az.LoadBalancerClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateLBWithRetry(lb network.LoadBalancer) error {
|
||||
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
return wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%s): start", *lb.Name)
|
||||
respChan, errChan := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
|
||||
@ -101,9 +169,120 @@ func (az *Cloud) CreateOrUpdateLBWithRetry(lb network.LoadBalancer) error {
|
||||
})
|
||||
}
|
||||
|
||||
// ListLBWithRetry invokes az.VirtualMachinesClient.List with exponential backoff retry
|
||||
func (az *Cloud) ListLBWithRetry() ([]network.LoadBalancer, error) {
|
||||
allLBs := []network.LoadBalancer{}
|
||||
var result network.LoadBalancerListResult
|
||||
|
||||
err := wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
var retryErr error
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("LoadBalancerClient.List(%v): start", az.ResourceGroup)
|
||||
result, retryErr = az.LoadBalancerClient.List(az.ResourceGroup)
|
||||
glog.V(10).Infof("LoadBalancerClient.List(%v): end", az.ResourceGroup)
|
||||
if retryErr != nil {
|
||||
glog.Errorf("LoadBalancerClient.List(%v) - backoff: failure, will retry,err=%v",
|
||||
az.ResourceGroup,
|
||||
retryErr)
|
||||
return false, retryErr
|
||||
}
|
||||
glog.V(2).Infof("LoadBalancerClient.List(%v) - backoff: success", az.ResourceGroup)
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
appendResults := (result.Value != nil && len(*result.Value) > 0)
|
||||
for appendResults {
|
||||
allLBs = append(allLBs, *result.Value...)
|
||||
appendResults = false
|
||||
|
||||
// follow the next link to get all the vms for resource group
|
||||
if result.NextLink != nil {
|
||||
err := wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
var retryErr error
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("LoadBalancerClient.ListNextResults(%v): start", az.ResourceGroup)
|
||||
result, retryErr = az.LoadBalancerClient.ListNextResults(result)
|
||||
glog.V(10).Infof("LoadBalancerClient.ListNextResults(%v): end", az.ResourceGroup)
|
||||
if retryErr != nil {
|
||||
glog.Errorf("LoadBalancerClient.ListNextResults(%v) - backoff: failure, will retry,err=%v",
|
||||
az.ResourceGroup,
|
||||
retryErr)
|
||||
return false, retryErr
|
||||
}
|
||||
glog.V(2).Infof("LoadBalancerClient.ListNextResults(%v) - backoff: success", az.ResourceGroup)
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return allLBs, err
|
||||
}
|
||||
appendResults = (result.Value != nil && len(*result.Value) > 0)
|
||||
}
|
||||
}
|
||||
|
||||
return allLBs, nil
|
||||
}
|
||||
|
||||
// ListPIPWithRetry list the PIP resources in az.ResourceGroup
|
||||
func (az *Cloud) ListPIPWithRetry() ([]network.PublicIPAddress, error) {
|
||||
allPIPs := []network.PublicIPAddress{}
|
||||
var result network.PublicIPAddressListResult
|
||||
err := wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
var retryErr error
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("PublicIPAddressesClient.List(%v): start", az.ResourceGroup)
|
||||
result, retryErr = az.PublicIPAddressesClient.List(az.ResourceGroup)
|
||||
glog.V(10).Infof("PublicIPAddressesClient.List(%v): end", az.ResourceGroup)
|
||||
if retryErr != nil {
|
||||
glog.Errorf("PublicIPAddressesClient.List(%v) - backoff: failure, will retry,err=%v",
|
||||
az.ResourceGroup,
|
||||
retryErr)
|
||||
return false, retryErr
|
||||
}
|
||||
glog.V(2).Infof("PublicIPAddressesClient.List(%v) - backoff: success", az.ResourceGroup)
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
appendResults := (result.Value != nil && len(*result.Value) > 0)
|
||||
for appendResults {
|
||||
allPIPs = append(allPIPs, *result.Value...)
|
||||
appendResults = false
|
||||
|
||||
// follow the next link to get all the vms for resource group
|
||||
if result.NextLink != nil {
|
||||
err := wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
var retryErr error
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("PublicIPAddressesClient.ListNextResults(%v): start", az.ResourceGroup)
|
||||
result, retryErr = az.PublicIPAddressesClient.ListNextResults(result)
|
||||
glog.V(10).Infof("PublicIPAddressesClient.ListNextResults(%v): end", az.ResourceGroup)
|
||||
if retryErr != nil {
|
||||
glog.Errorf("PublicIPAddressesClient.ListNextResults(%v) - backoff: failure, will retry,err=%v",
|
||||
az.ResourceGroup,
|
||||
retryErr)
|
||||
return false, retryErr
|
||||
}
|
||||
glog.V(2).Infof("PublicIPAddressesClient.ListNextResults(%v) - backoff: success", az.ResourceGroup)
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return allPIPs, err
|
||||
}
|
||||
appendResults = (result.Value != nil && len(*result.Value) > 0)
|
||||
}
|
||||
}
|
||||
|
||||
return allPIPs, nil
|
||||
}
|
||||
|
||||
// CreateOrUpdatePIPWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdatePIPWithRetry(pip network.PublicIPAddress) error {
|
||||
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
return wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("PublicIPAddressesClient.CreateOrUpdate(%s): start", *pip.Name)
|
||||
respChan, errChan := az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil)
|
||||
@ -116,7 +295,7 @@ func (az *Cloud) CreateOrUpdatePIPWithRetry(pip network.PublicIPAddress) error {
|
||||
|
||||
// CreateOrUpdateInterfaceWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateInterfaceWithRetry(nic network.Interface) error {
|
||||
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
return wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%s): start", *nic.Name)
|
||||
respChan, errChan := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil)
|
||||
@ -129,7 +308,7 @@ func (az *Cloud) CreateOrUpdateInterfaceWithRetry(nic network.Interface) error {
|
||||
|
||||
// DeletePublicIPWithRetry invokes az.PublicIPAddressesClient.Delete with exponential backoff retry
|
||||
func (az *Cloud) DeletePublicIPWithRetry(pipName string) error {
|
||||
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
return wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("PublicIPAddressesClient.Delete(%s): start", pipName)
|
||||
respChan, errChan := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil)
|
||||
@ -142,7 +321,7 @@ func (az *Cloud) DeletePublicIPWithRetry(pipName string) error {
|
||||
|
||||
// DeleteLBWithRetry invokes az.LoadBalancerClient.Delete with exponential backoff retry
|
||||
func (az *Cloud) DeleteLBWithRetry(lbName string) error {
|
||||
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
return wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("LoadBalancerClient.Delete(%s): start", lbName)
|
||||
respChan, errChan := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil)
|
||||
@ -155,7 +334,7 @@ func (az *Cloud) DeleteLBWithRetry(lbName string) error {
|
||||
|
||||
// CreateOrUpdateRouteTableWithRetry invokes az.RouteTablesClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateRouteTableWithRetry(routeTable network.RouteTable) error {
|
||||
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
return wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("RouteTablesClient.CreateOrUpdate(%s): start", *routeTable.Name)
|
||||
respChan, errChan := az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil)
|
||||
@ -168,7 +347,7 @@ func (az *Cloud) CreateOrUpdateRouteTableWithRetry(routeTable network.RouteTable
|
||||
|
||||
// CreateOrUpdateRouteWithRetry invokes az.RoutesClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateRouteWithRetry(route network.Route) error {
|
||||
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
return wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("RoutesClient.CreateOrUpdate(%s): start", *route.Name)
|
||||
respChan, errChan := az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil)
|
||||
@ -181,7 +360,7 @@ func (az *Cloud) CreateOrUpdateRouteWithRetry(route network.Route) error {
|
||||
|
||||
// DeleteRouteWithRetry invokes az.RoutesClient.Delete with exponential backoff retry
|
||||
func (az *Cloud) DeleteRouteWithRetry(routeName string) error {
|
||||
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
return wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("RoutesClient.Delete(%s): start", az.RouteTableName)
|
||||
respChan, errChan := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil)
|
||||
@ -194,7 +373,7 @@ func (az *Cloud) DeleteRouteWithRetry(routeName string) error {
|
||||
|
||||
// CreateOrUpdateVMWithRetry invokes az.VirtualMachinesClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateVMWithRetry(vmName string, newVM compute.VirtualMachine) error {
|
||||
return wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
return wait.ExponentialBackoff(az.getorCreateRequestBackoff(), func() (bool, error) {
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachinesClient.CreateOrUpdate(%s): start", vmName)
|
||||
respChan, errChan := az.VirtualMachinesClient.CreateOrUpdate(az.ResourceGroup, vmName, newVM, nil)
|
||||
|
584
pkg/cloudprovider/providers/azure/azure_fakes.go
Normal file
584
pkg/cloudprovider/providers/azure/azure_fakes.go
Normal file
@ -0,0 +1,584 @@
|
||||
package azure
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/go-autorest/autorest/to"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/arm/compute"
|
||||
"github.com/Azure/azure-sdk-for-go/arm/network"
|
||||
"github.com/Azure/go-autorest/autorest"
|
||||
)
|
||||
|
||||
type fakeAzureLBClient struct {
|
||||
mutex *sync.Mutex
|
||||
FakeStore map[string]map[string]network.LoadBalancer
|
||||
}
|
||||
|
||||
func NewFakeAzureLBClient() fakeAzureLBClient {
|
||||
fLBC := fakeAzureLBClient{}
|
||||
fLBC.FakeStore = make(map[string]map[string]network.LoadBalancer)
|
||||
fLBC.mutex = &sync.Mutex{}
|
||||
return fLBC
|
||||
}
|
||||
|
||||
func (fLBC fakeAzureLBClient) CreateOrUpdate(resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, cancel <-chan struct{}) (<-chan network.LoadBalancer, <-chan error) {
|
||||
fLBC.mutex.Lock()
|
||||
defer fLBC.mutex.Unlock()
|
||||
resultChan := make(chan network.LoadBalancer, 1)
|
||||
errChan := make(chan error, 1)
|
||||
var result network.LoadBalancer
|
||||
var err error
|
||||
defer func() {
|
||||
resultChan <- result
|
||||
errChan <- err
|
||||
close(resultChan)
|
||||
close(errChan)
|
||||
}()
|
||||
if _, ok := fLBC.FakeStore[resourceGroupName]; !ok {
|
||||
fLBC.FakeStore[resourceGroupName] = make(map[string]network.LoadBalancer)
|
||||
}
|
||||
|
||||
// For dynamic ip allocation, just fill in the PrivateIPAddress
|
||||
if parameters.FrontendIPConfigurations != nil {
|
||||
for idx, config := range *parameters.FrontendIPConfigurations {
|
||||
if config.PrivateIPAllocationMethod == network.Dynamic {
|
||||
(*parameters.FrontendIPConfigurations)[idx].PrivateIPAddress = to.StringPtr("10.0.0.19")
|
||||
}
|
||||
}
|
||||
}
|
||||
fLBC.FakeStore[resourceGroupName][loadBalancerName] = parameters
|
||||
result = fLBC.FakeStore[resourceGroupName][loadBalancerName]
|
||||
err = nil
|
||||
return resultChan, errChan
|
||||
}
|
||||
|
||||
func (fLBC fakeAzureLBClient) Delete(resourceGroupName string, loadBalancerName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) {
|
||||
fLBC.mutex.Lock()
|
||||
defer fLBC.mutex.Unlock()
|
||||
respChan := make(chan autorest.Response, 1)
|
||||
errChan := make(chan error, 1)
|
||||
var resp autorest.Response
|
||||
var err error
|
||||
defer func() {
|
||||
respChan <- resp
|
||||
errChan <- err
|
||||
close(respChan)
|
||||
close(errChan)
|
||||
}()
|
||||
if _, ok := fLBC.FakeStore[resourceGroupName]; ok {
|
||||
if _, ok := fLBC.FakeStore[resourceGroupName][loadBalancerName]; ok {
|
||||
delete(fLBC.FakeStore[resourceGroupName], loadBalancerName)
|
||||
resp.Response = &http.Response{
|
||||
StatusCode: http.StatusAccepted,
|
||||
}
|
||||
err = nil
|
||||
return respChan, errChan
|
||||
}
|
||||
}
|
||||
resp.Response = &http.Response{
|
||||
StatusCode: http.StatusNotFound,
|
||||
}
|
||||
err = autorest.DetailedError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Message: "Not such LB",
|
||||
}
|
||||
return respChan, errChan
|
||||
}
|
||||
|
||||
func (fLBC fakeAzureLBClient) Get(resourceGroupName string, loadBalancerName string, expand string) (result network.LoadBalancer, err error) {
|
||||
fLBC.mutex.Lock()
|
||||
defer fLBC.mutex.Unlock()
|
||||
if _, ok := fLBC.FakeStore[resourceGroupName]; ok {
|
||||
if entity, ok := fLBC.FakeStore[resourceGroupName][loadBalancerName]; ok {
|
||||
return entity, nil
|
||||
}
|
||||
}
|
||||
return result, autorest.DetailedError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Message: "Not such LB",
|
||||
}
|
||||
}
|
||||
|
||||
func (fLBC fakeAzureLBClient) List(resourceGroupName string) (result network.LoadBalancerListResult, err error) {
|
||||
fLBC.mutex.Lock()
|
||||
defer fLBC.mutex.Unlock()
|
||||
var value []network.LoadBalancer
|
||||
if _, ok := fLBC.FakeStore[resourceGroupName]; ok {
|
||||
for _, v := range fLBC.FakeStore[resourceGroupName] {
|
||||
value = append(value, v)
|
||||
}
|
||||
}
|
||||
result.Response.Response = &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
}
|
||||
result.NextLink = nil
|
||||
result.Value = &value
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (fLBC fakeAzureLBClient) ListNextResults(lastResult network.LoadBalancerListResult) (result network.LoadBalancerListResult, err error) {
|
||||
fLBC.mutex.Lock()
|
||||
defer fLBC.mutex.Unlock()
|
||||
result.Response.Response = &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
}
|
||||
result.NextLink = nil
|
||||
result.Value = nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type fakeAzurePIPClient struct {
|
||||
mutex *sync.Mutex
|
||||
FakeStore map[string]map[string]network.PublicIPAddress
|
||||
SubscriptionID string
|
||||
}
|
||||
|
||||
const publicIPAddressIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Network/publicIPAddresses/%s"
|
||||
|
||||
// returns the full identifier of a publicIPAddress.
|
||||
func getpublicIPAddressID(subscriptionID string, resourceGroupName, pipName string) string {
|
||||
return fmt.Sprintf(
|
||||
publicIPAddressIDTemplate,
|
||||
subscriptionID,
|
||||
resourceGroupName,
|
||||
pipName)
|
||||
}
|
||||
|
||||
func NewFakeAzurePIPClient(subscriptionID string) fakeAzurePIPClient {
|
||||
fAPC := fakeAzurePIPClient{}
|
||||
fAPC.FakeStore = make(map[string]map[string]network.PublicIPAddress)
|
||||
fAPC.SubscriptionID = subscriptionID
|
||||
fAPC.mutex = &sync.Mutex{}
|
||||
return fAPC
|
||||
}
|
||||
|
||||
func (fAPC fakeAzurePIPClient) CreateOrUpdate(resourceGroupName string, publicIPAddressName string, parameters network.PublicIPAddress, cancel <-chan struct{}) (<-chan network.PublicIPAddress, <-chan error) {
|
||||
fAPC.mutex.Lock()
|
||||
defer fAPC.mutex.Unlock()
|
||||
resultChan := make(chan network.PublicIPAddress, 1)
|
||||
errChan := make(chan error, 1)
|
||||
var result network.PublicIPAddress
|
||||
var err error
|
||||
defer func() {
|
||||
resultChan <- result
|
||||
errChan <- err
|
||||
close(resultChan)
|
||||
close(errChan)
|
||||
}()
|
||||
if _, ok := fAPC.FakeStore[resourceGroupName]; !ok {
|
||||
fAPC.FakeStore[resourceGroupName] = make(map[string]network.PublicIPAddress)
|
||||
}
|
||||
|
||||
// assign id
|
||||
pipID := getpublicIPAddressID(fAPC.SubscriptionID, resourceGroupName, publicIPAddressName)
|
||||
parameters.ID = &pipID
|
||||
|
||||
// only create in the case user has not provided
|
||||
if parameters.PublicIPAddressPropertiesFormat != nil &&
|
||||
parameters.PublicIPAddressPropertiesFormat.PublicIPAllocationMethod == network.Static {
|
||||
// assign ip
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
randomIP := fmt.Sprintf("%d.%d.%d.%d", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256))
|
||||
parameters.IPAddress = &randomIP
|
||||
}
|
||||
|
||||
fAPC.FakeStore[resourceGroupName][publicIPAddressName] = parameters
|
||||
result = fAPC.FakeStore[resourceGroupName][publicIPAddressName]
|
||||
err = nil
|
||||
return resultChan, errChan
|
||||
}
|
||||
|
||||
func (fAPC fakeAzurePIPClient) Delete(resourceGroupName string, publicIPAddressName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) {
|
||||
fAPC.mutex.Lock()
|
||||
defer fAPC.mutex.Unlock()
|
||||
respChan := make(chan autorest.Response, 1)
|
||||
errChan := make(chan error, 1)
|
||||
var resp autorest.Response
|
||||
var err error
|
||||
defer func() {
|
||||
respChan <- resp
|
||||
errChan <- err
|
||||
close(respChan)
|
||||
close(errChan)
|
||||
}()
|
||||
if _, ok := fAPC.FakeStore[resourceGroupName]; ok {
|
||||
if _, ok := fAPC.FakeStore[resourceGroupName][publicIPAddressName]; ok {
|
||||
delete(fAPC.FakeStore[resourceGroupName], publicIPAddressName)
|
||||
resp.Response = &http.Response{
|
||||
StatusCode: http.StatusAccepted,
|
||||
}
|
||||
err = nil
|
||||
return respChan, errChan
|
||||
}
|
||||
}
|
||||
resp.Response = &http.Response{
|
||||
StatusCode: http.StatusNotFound,
|
||||
}
|
||||
err = autorest.DetailedError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Message: "Not such PIP",
|
||||
}
|
||||
return respChan, errChan
|
||||
}
|
||||
|
||||
func (fAPC fakeAzurePIPClient) Get(resourceGroupName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err error) {
|
||||
fAPC.mutex.Lock()
|
||||
defer fAPC.mutex.Unlock()
|
||||
if _, ok := fAPC.FakeStore[resourceGroupName]; ok {
|
||||
if entity, ok := fAPC.FakeStore[resourceGroupName][publicIPAddressName]; ok {
|
||||
return entity, nil
|
||||
}
|
||||
}
|
||||
return result, autorest.DetailedError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Message: "Not such PIP",
|
||||
}
|
||||
}
|
||||
|
||||
func (fAPC fakeAzurePIPClient) ListNextResults(lastResults network.PublicIPAddressListResult) (result network.PublicIPAddressListResult, err error) {
|
||||
fAPC.mutex.Lock()
|
||||
defer fAPC.mutex.Unlock()
|
||||
return network.PublicIPAddressListResult{}, nil
|
||||
}
|
||||
|
||||
func (fAPC fakeAzurePIPClient) List(resourceGroupName string) (result network.PublicIPAddressListResult, err error) {
|
||||
fAPC.mutex.Lock()
|
||||
defer fAPC.mutex.Unlock()
|
||||
var value []network.PublicIPAddress
|
||||
if _, ok := fAPC.FakeStore[resourceGroupName]; ok {
|
||||
for _, v := range fAPC.FakeStore[resourceGroupName] {
|
||||
value = append(value, v)
|
||||
}
|
||||
}
|
||||
result.Response.Response = &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
}
|
||||
result.NextLink = nil
|
||||
result.Value = &value
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type fakeInterfacesClient struct {
|
||||
mutex *sync.Mutex
|
||||
FakeStore map[string]map[string]network.Interface
|
||||
}
|
||||
|
||||
func NewFakeInterfacesClient() fakeInterfacesClient {
|
||||
fIC := fakeInterfacesClient{}
|
||||
fIC.FakeStore = make(map[string]map[string]network.Interface)
|
||||
fIC.mutex = &sync.Mutex{}
|
||||
|
||||
return fIC
|
||||
}
|
||||
|
||||
func (fIC fakeInterfacesClient) CreateOrUpdate(resourceGroupName string, networkInterfaceName string, parameters network.Interface, cancel <-chan struct{}) (<-chan network.Interface, <-chan error) {
|
||||
fIC.mutex.Lock()
|
||||
defer fIC.mutex.Unlock()
|
||||
resultChan := make(chan network.Interface, 1)
|
||||
errChan := make(chan error, 1)
|
||||
var result network.Interface
|
||||
var err error
|
||||
defer func() {
|
||||
resultChan <- result
|
||||
errChan <- err
|
||||
close(resultChan)
|
||||
close(errChan)
|
||||
}()
|
||||
if _, ok := fIC.FakeStore[resourceGroupName]; !ok {
|
||||
fIC.FakeStore[resourceGroupName] = make(map[string]network.Interface)
|
||||
}
|
||||
fIC.FakeStore[resourceGroupName][networkInterfaceName] = parameters
|
||||
result = fIC.FakeStore[resourceGroupName][networkInterfaceName]
|
||||
err = nil
|
||||
|
||||
return resultChan, errChan
|
||||
}
|
||||
|
||||
func (fIC fakeInterfacesClient) Get(resourceGroupName string, networkInterfaceName string, expand string) (result network.Interface, err error) {
|
||||
fIC.mutex.Lock()
|
||||
defer fIC.mutex.Unlock()
|
||||
if _, ok := fIC.FakeStore[resourceGroupName]; ok {
|
||||
if entity, ok := fIC.FakeStore[resourceGroupName][networkInterfaceName]; ok {
|
||||
return entity, nil
|
||||
}
|
||||
}
|
||||
return result, autorest.DetailedError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Message: "Not such Interface",
|
||||
}
|
||||
}
|
||||
|
||||
type fakeVirtualMachinesClient struct {
|
||||
mutex *sync.Mutex
|
||||
FakeStore map[string]map[string]compute.VirtualMachine
|
||||
}
|
||||
|
||||
func NewFakeVirtualMachinesClient() fakeVirtualMachinesClient {
|
||||
fVMC := fakeVirtualMachinesClient{}
|
||||
fVMC.FakeStore = make(map[string]map[string]compute.VirtualMachine)
|
||||
fVMC.mutex = &sync.Mutex{}
|
||||
return fVMC
|
||||
}
|
||||
|
||||
func (fVMC fakeVirtualMachinesClient) CreateOrUpdate(resourceGroupName string, VMName string, parameters compute.VirtualMachine, cancel <-chan struct{}) (<-chan compute.VirtualMachine, <-chan error) {
|
||||
fVMC.mutex.Lock()
|
||||
defer fVMC.mutex.Unlock()
|
||||
resultChan := make(chan compute.VirtualMachine, 1)
|
||||
errChan := make(chan error, 1)
|
||||
var result compute.VirtualMachine
|
||||
var err error
|
||||
defer func() {
|
||||
resultChan <- result
|
||||
errChan <- err
|
||||
close(resultChan)
|
||||
close(errChan)
|
||||
}()
|
||||
if _, ok := fVMC.FakeStore[resourceGroupName]; !ok {
|
||||
fVMC.FakeStore[resourceGroupName] = make(map[string]compute.VirtualMachine)
|
||||
}
|
||||
fVMC.FakeStore[resourceGroupName][VMName] = parameters
|
||||
result = fVMC.FakeStore[resourceGroupName][VMName]
|
||||
err = nil
|
||||
return resultChan, errChan
|
||||
}
|
||||
|
||||
func (fVMC fakeVirtualMachinesClient) Get(resourceGroupName string, VMName string, expand compute.InstanceViewTypes) (result compute.VirtualMachine, err error) {
|
||||
fVMC.mutex.Lock()
|
||||
defer fVMC.mutex.Unlock()
|
||||
if _, ok := fVMC.FakeStore[resourceGroupName]; ok {
|
||||
if entity, ok := fVMC.FakeStore[resourceGroupName][VMName]; ok {
|
||||
return entity, nil
|
||||
}
|
||||
}
|
||||
return result, autorest.DetailedError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Message: "Not such VM",
|
||||
}
|
||||
}
|
||||
|
||||
func (fVMC fakeVirtualMachinesClient) List(resourceGroupName string) (result compute.VirtualMachineListResult, err error) {
|
||||
fVMC.mutex.Lock()
|
||||
defer fVMC.mutex.Unlock()
|
||||
var value []compute.VirtualMachine
|
||||
if _, ok := fVMC.FakeStore[resourceGroupName]; ok {
|
||||
for _, v := range fVMC.FakeStore[resourceGroupName] {
|
||||
value = append(value, v)
|
||||
}
|
||||
}
|
||||
result.Response.Response = &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
}
|
||||
result.NextLink = nil
|
||||
result.Value = &value
|
||||
return result, nil
|
||||
}
|
||||
func (fVMC fakeVirtualMachinesClient) ListNextResults(lastResults compute.VirtualMachineListResult) (result compute.VirtualMachineListResult, err error) {
|
||||
fVMC.mutex.Lock()
|
||||
defer fVMC.mutex.Unlock()
|
||||
return compute.VirtualMachineListResult{}, nil
|
||||
}
|
||||
|
||||
type fakeAzureSubnetsClient struct {
|
||||
mutex *sync.Mutex
|
||||
FakeStore map[string]map[string]network.Subnet
|
||||
}
|
||||
|
||||
func NewFakeAzureSubnetsClient() fakeAzureSubnetsClient {
|
||||
fASC := fakeAzureSubnetsClient{}
|
||||
fASC.FakeStore = make(map[string]map[string]network.Subnet)
|
||||
fASC.mutex = &sync.Mutex{}
|
||||
return fASC
|
||||
}
|
||||
|
||||
func (fASC fakeAzureSubnetsClient) CreateOrUpdate(resourceGroupName string, virtualNetworkName string, subnetName string, subnetParameters network.Subnet, cancel <-chan struct{}) (<-chan network.Subnet, <-chan error) {
|
||||
fASC.mutex.Lock()
|
||||
defer fASC.mutex.Unlock()
|
||||
resultChan := make(chan network.Subnet, 1)
|
||||
errChan := make(chan error, 1)
|
||||
var result network.Subnet
|
||||
var err error
|
||||
defer func() {
|
||||
resultChan <- result
|
||||
errChan <- err
|
||||
close(resultChan)
|
||||
close(errChan)
|
||||
}()
|
||||
rgVnet := strings.Join([]string{resourceGroupName, virtualNetworkName}, "AND")
|
||||
if _, ok := fASC.FakeStore[rgVnet]; !ok {
|
||||
fASC.FakeStore[rgVnet] = make(map[string]network.Subnet)
|
||||
}
|
||||
fASC.FakeStore[rgVnet][subnetName] = subnetParameters
|
||||
result = fASC.FakeStore[rgVnet][subnetName]
|
||||
err = nil
|
||||
return resultChan, errChan
|
||||
}
|
||||
|
||||
func (fASC fakeAzureSubnetsClient) Delete(resourceGroupName string, virtualNetworkName string, subnetName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) {
|
||||
fASC.mutex.Lock()
|
||||
defer fASC.mutex.Unlock()
|
||||
respChan := make(chan autorest.Response, 1)
|
||||
errChan := make(chan error, 1)
|
||||
var resp autorest.Response
|
||||
var err error
|
||||
defer func() {
|
||||
respChan <- resp
|
||||
errChan <- err
|
||||
close(respChan)
|
||||
close(errChan)
|
||||
}()
|
||||
|
||||
rgVnet := strings.Join([]string{resourceGroupName, virtualNetworkName}, "AND")
|
||||
if _, ok := fASC.FakeStore[rgVnet]; ok {
|
||||
if _, ok := fASC.FakeStore[rgVnet][subnetName]; ok {
|
||||
delete(fASC.FakeStore[rgVnet], subnetName)
|
||||
resp.Response = &http.Response{
|
||||
StatusCode: http.StatusAccepted,
|
||||
}
|
||||
err = nil
|
||||
return respChan, errChan
|
||||
}
|
||||
}
|
||||
resp.Response = &http.Response{
|
||||
StatusCode: http.StatusNotFound,
|
||||
}
|
||||
err = autorest.DetailedError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Message: "Not such Subnet",
|
||||
}
|
||||
return respChan, errChan
|
||||
}
|
||||
func (fASC fakeAzureSubnetsClient) Get(resourceGroupName string, virtualNetworkName string, subnetName string, expand string) (result network.Subnet, err error) {
|
||||
fASC.mutex.Lock()
|
||||
defer fASC.mutex.Unlock()
|
||||
rgVnet := strings.Join([]string{resourceGroupName, virtualNetworkName}, "AND")
|
||||
if _, ok := fASC.FakeStore[rgVnet]; ok {
|
||||
if entity, ok := fASC.FakeStore[rgVnet][subnetName]; ok {
|
||||
return entity, nil
|
||||
}
|
||||
}
|
||||
return result, autorest.DetailedError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Message: "Not such Subnet",
|
||||
}
|
||||
}
|
||||
func (fASC fakeAzureSubnetsClient) List(resourceGroupName string, virtualNetworkName string) (result network.SubnetListResult, err error) {
|
||||
fASC.mutex.Lock()
|
||||
defer fASC.mutex.Unlock()
|
||||
rgVnet := strings.Join([]string{resourceGroupName, virtualNetworkName}, "AND")
|
||||
var value []network.Subnet
|
||||
if _, ok := fASC.FakeStore[rgVnet]; ok {
|
||||
for _, v := range fASC.FakeStore[rgVnet] {
|
||||
value = append(value, v)
|
||||
}
|
||||
}
|
||||
result.Response.Response = &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
}
|
||||
result.NextLink = nil
|
||||
result.Value = &value
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type fakeAzureNSGClient struct {
|
||||
mutex *sync.Mutex
|
||||
FakeStore map[string]map[string]network.SecurityGroup
|
||||
}
|
||||
|
||||
func NewFakeAzureNSGClient() fakeAzureNSGClient {
|
||||
fNSG := fakeAzureNSGClient{}
|
||||
fNSG.FakeStore = make(map[string]map[string]network.SecurityGroup)
|
||||
fNSG.mutex = &sync.Mutex{}
|
||||
return fNSG
|
||||
}
|
||||
|
||||
func (fNSG fakeAzureNSGClient) CreateOrUpdate(resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, cancel <-chan struct{}) (<-chan network.SecurityGroup, <-chan error) {
|
||||
fNSG.mutex.Lock()
|
||||
defer fNSG.mutex.Unlock()
|
||||
resultChan := make(chan network.SecurityGroup, 1)
|
||||
errChan := make(chan error, 1)
|
||||
var result network.SecurityGroup
|
||||
var err error
|
||||
defer func() {
|
||||
resultChan <- result
|
||||
errChan <- err
|
||||
close(resultChan)
|
||||
close(errChan)
|
||||
}()
|
||||
if _, ok := fNSG.FakeStore[resourceGroupName]; !ok {
|
||||
fNSG.FakeStore[resourceGroupName] = make(map[string]network.SecurityGroup)
|
||||
}
|
||||
fNSG.FakeStore[resourceGroupName][networkSecurityGroupName] = parameters
|
||||
result = fNSG.FakeStore[resourceGroupName][networkSecurityGroupName]
|
||||
err = nil
|
||||
return resultChan, errChan
|
||||
}
|
||||
|
||||
func (fNSG fakeAzureNSGClient) Delete(resourceGroupName string, networkSecurityGroupName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) {
|
||||
fNSG.mutex.Lock()
|
||||
defer fNSG.mutex.Unlock()
|
||||
respChan := make(chan autorest.Response, 1)
|
||||
errChan := make(chan error, 1)
|
||||
var resp autorest.Response
|
||||
var err error
|
||||
defer func() {
|
||||
respChan <- resp
|
||||
errChan <- err
|
||||
close(respChan)
|
||||
close(errChan)
|
||||
}()
|
||||
if _, ok := fNSG.FakeStore[resourceGroupName]; ok {
|
||||
if _, ok := fNSG.FakeStore[resourceGroupName][networkSecurityGroupName]; ok {
|
||||
delete(fNSG.FakeStore[resourceGroupName], networkSecurityGroupName)
|
||||
resp.Response = &http.Response{
|
||||
StatusCode: http.StatusAccepted,
|
||||
}
|
||||
err = nil
|
||||
return respChan, errChan
|
||||
}
|
||||
}
|
||||
resp.Response = &http.Response{
|
||||
StatusCode: http.StatusNotFound,
|
||||
}
|
||||
err = autorest.DetailedError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Message: "Not such NSG",
|
||||
}
|
||||
return respChan, errChan
|
||||
}
|
||||
|
||||
func (fNSG fakeAzureNSGClient) Get(resourceGroupName string, networkSecurityGroupName string, expand string) (result network.SecurityGroup, err error) {
|
||||
fNSG.mutex.Lock()
|
||||
defer fNSG.mutex.Unlock()
|
||||
if _, ok := fNSG.FakeStore[resourceGroupName]; ok {
|
||||
if entity, ok := fNSG.FakeStore[resourceGroupName][networkSecurityGroupName]; ok {
|
||||
return entity, nil
|
||||
}
|
||||
}
|
||||
return result, autorest.DetailedError{
|
||||
StatusCode: http.StatusNotFound,
|
||||
Message: "Not such NSG",
|
||||
}
|
||||
}
|
||||
|
||||
func (fNSG fakeAzureNSGClient) List(resourceGroupName string) (result network.SecurityGroupListResult, err error) {
|
||||
fNSG.mutex.Lock()
|
||||
defer fNSG.mutex.Unlock()
|
||||
var value []network.SecurityGroup
|
||||
if _, ok := fNSG.FakeStore[resourceGroupName]; ok {
|
||||
for _, v := range fNSG.FakeStore[resourceGroupName] {
|
||||
value = append(value, v)
|
||||
}
|
||||
}
|
||||
result.Response.Response = &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
}
|
||||
result.NextLink = nil
|
||||
result.Value = &value
|
||||
return result, nil
|
||||
}
|
@ -199,39 +199,6 @@ func (az *Cloud) CurrentNodeName(hostname string) (types.NodeName, error) {
|
||||
return types.NodeName(hostname), nil
|
||||
}
|
||||
|
||||
func (az *Cloud) listAllNodesInResourceGroup() ([]compute.VirtualMachine, error) {
|
||||
allNodes := []compute.VirtualMachine{}
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachinesClient.List(%s): start", az.ResourceGroup)
|
||||
result, err := az.VirtualMachinesClient.List(az.ResourceGroup)
|
||||
glog.V(10).Infof("VirtualMachinesClient.List(%s): end", az.ResourceGroup)
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.listAllNodesInResourceGroup(), az.VirtualMachinesClient.List(%s), err=%v", az.ResourceGroup, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
morePages := (result.Value != nil && len(*result.Value) > 1)
|
||||
|
||||
for morePages {
|
||||
allNodes = append(allNodes, *result.Value...)
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachinesClient.ListAllNextResults(%v): start", az.ResourceGroup)
|
||||
result, err = az.VirtualMachinesClient.ListAllNextResults(result)
|
||||
glog.V(10).Infof("VirtualMachinesClient.ListAllNextResults(%v): end", az.ResourceGroup)
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.listAllNodesInResourceGroup(), az.VirtualMachinesClient.ListAllNextResults(%v), err=%v", result, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
morePages = (result.Value != nil && len(*result.Value) > 1)
|
||||
}
|
||||
|
||||
return allNodes, nil
|
||||
|
||||
}
|
||||
|
||||
// mapNodeNameToVMName maps a k8s NodeName to an Azure VM Name
|
||||
// This is a simple string cast.
|
||||
func mapNodeNameToVMName(nodeName types.NodeName) string {
|
||||
|
@ -39,86 +39,31 @@ const ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/azure-
|
||||
// to specify what subnet it is exposed on
|
||||
const ServiceAnnotationLoadBalancerInternalSubnet = "service.beta.kubernetes.io/azure-load-balancer-internal-subnet"
|
||||
|
||||
// ServiceAnnotationLoadBalancerMode is the annotation used on the service to specify the
|
||||
// Azure load balancer selection based on availability sets
|
||||
const ServiceAnnotationLoadBalancerMode = "service.beta.kubernetes.io/azure-load-balancer-mode"
|
||||
|
||||
// ServiceAnnotationLoadBalancerAutoModeValue the annotation used on the service to specify the
|
||||
// Azure load balancer auto selection from the availability sets
|
||||
const ServiceAnnotationLoadBalancerAutoModeValue = "__auto__"
|
||||
|
||||
// ServiceAnnotationDNSLabelName annotation speficying the DNS label name for the service.
|
||||
const ServiceAnnotationDNSLabelName = "service.beta.kubernetes.io/azure-dns-label-name"
|
||||
|
||||
// GetLoadBalancer returns whether the specified load balancer exists, and
|
||||
// if so, what its status is.
|
||||
func (az *Cloud) GetLoadBalancer(clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) {
|
||||
isInternal := requiresInternalLoadBalancer(service)
|
||||
lbName := getLoadBalancerName(clusterName, isInternal)
|
||||
serviceName := getServiceName(service)
|
||||
|
||||
lb, existsLb, err := az.getAzureLoadBalancer(lbName)
|
||||
_, status, exists, err = az.getServiceLoadBalancer(service, clusterName, nil, false)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if !existsLb {
|
||||
glog.V(5).Infof("get(%s): lb(%s) - doesn't exist", serviceName, lbName)
|
||||
return nil, false, nil
|
||||
if exists == false {
|
||||
serviceName := getServiceName(service)
|
||||
glog.V(5).Infof("getloadbalancer (cluster:%s) (service:%s)- IP doesn't exist in any of the lbs", clusterName, serviceName)
|
||||
return nil, false, fmt.Errorf("Service(%s) - Loadbalancer not found", serviceName)
|
||||
}
|
||||
|
||||
var lbIP *string
|
||||
|
||||
if isInternal {
|
||||
lbFrontendIPConfigName := getFrontendIPConfigName(service, subnet(service))
|
||||
for _, ipConfiguration := range *lb.FrontendIPConfigurations {
|
||||
if lbFrontendIPConfigName == *ipConfiguration.Name {
|
||||
lbIP = ipConfiguration.PrivateIPAddress
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: Consider also read address from lb's FrontendIPConfigurations
|
||||
pipName, err := az.determinePublicIPName(clusterName, service)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
pip, existsPip, err := az.getPublicIPAddress(pipName)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if existsPip {
|
||||
lbIP = pip.IPAddress
|
||||
}
|
||||
}
|
||||
|
||||
if lbIP == nil {
|
||||
glog.V(5).Infof("get(%s): lb(%s) - IP doesn't exist", serviceName, lbName)
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
return &v1.LoadBalancerStatus{
|
||||
Ingress: []v1.LoadBalancerIngress{{IP: *lbIP}},
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
func (az *Cloud) determinePublicIPName(clusterName string, service *v1.Service) (string, error) {
|
||||
loadBalancerIP := service.Spec.LoadBalancerIP
|
||||
if len(loadBalancerIP) == 0 {
|
||||
return getPublicIPName(clusterName, service), nil
|
||||
}
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("PublicIPAddressesClient.List(%v): start", az.ResourceGroup)
|
||||
list, err := az.PublicIPAddressesClient.List(az.ResourceGroup)
|
||||
glog.V(10).Infof("PublicIPAddressesClient.List(%v): end", az.ResourceGroup)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if list.Value != nil {
|
||||
for ix := range *list.Value {
|
||||
ip := &(*list.Value)[ix]
|
||||
if ip.PublicIPAddressPropertiesFormat.IPAddress != nil &&
|
||||
*ip.PublicIPAddressPropertiesFormat.IPAddress == loadBalancerIP {
|
||||
return *ip.Name, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO: follow next link here? Will there really ever be that many public IPs?
|
||||
|
||||
return "", fmt.Errorf("user supplied IP Address %s was not found", loadBalancerIP)
|
||||
return status, true, nil
|
||||
}
|
||||
|
||||
func getPublicIPLabel(service *v1.Service) string {
|
||||
@ -130,193 +75,35 @@ func getPublicIPLabel(service *v1.Service) string {
|
||||
|
||||
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
|
||||
func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
|
||||
isInternal := requiresInternalLoadBalancer(service)
|
||||
lbName := getLoadBalancerName(clusterName, isInternal)
|
||||
|
||||
// When a client updates the internal load balancer annotation,
|
||||
// the service may be switched from an internal LB to a public one, or vise versa.
|
||||
// Here we'll firstly ensure service do not lie in the opposite LB.
|
||||
err := az.cleanupLoadBalancer(clusterName, service, !isInternal)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
serviceName := getServiceName(service)
|
||||
glog.V(5).Infof("ensure(%s): START clusterName=%q lbName=%q", serviceName, clusterName, lbName)
|
||||
glog.V(5).Infof("ensureloadbalancer(%s): START clusterName=%q", serviceName, clusterName)
|
||||
flipedService := flipServiceInternalAnnotation(service)
|
||||
if _, err := az.reconcileLoadBalancer(clusterName, flipedService, nil, false /* wantLb */); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lb, existsLb, err := az.getAzureLoadBalancer(lbName)
|
||||
if _, err := az.reconcilePublicIP(clusterName, service, true /* wantLb */); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lb, err := az.reconcileLoadBalancer(clusterName, service, nodes, true /* wantLb */)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !existsLb {
|
||||
lb = network.LoadBalancer{
|
||||
Name: &lbName,
|
||||
Location: &az.Location,
|
||||
LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{},
|
||||
}
|
||||
}
|
||||
|
||||
var lbIP *string
|
||||
var fipConfigurationProperties *network.FrontendIPConfigurationPropertiesFormat
|
||||
|
||||
if isInternal {
|
||||
subnetName := subnet(service)
|
||||
if subnetName == nil {
|
||||
subnetName = &az.SubnetName
|
||||
}
|
||||
subnet, existsSubnet, err := az.getSubnet(az.VnetName, *subnetName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !existsSubnet {
|
||||
return nil, fmt.Errorf("ensure(%s): lb(%s) - failed to get subnet: %s/%s", serviceName, lbName, az.VnetName, az.SubnetName)
|
||||
}
|
||||
|
||||
configProperties := network.FrontendIPConfigurationPropertiesFormat{
|
||||
Subnet: &network.Subnet{
|
||||
ID: subnet.ID,
|
||||
},
|
||||
}
|
||||
|
||||
loadBalancerIP := service.Spec.LoadBalancerIP
|
||||
if loadBalancerIP != "" {
|
||||
configProperties.PrivateIPAllocationMethod = network.Static
|
||||
configProperties.PrivateIPAddress = &loadBalancerIP
|
||||
lbIP = &loadBalancerIP
|
||||
} else {
|
||||
// We'll need to call GetLoadBalancer later to retrieve allocated IP.
|
||||
configProperties.PrivateIPAllocationMethod = network.Dynamic
|
||||
}
|
||||
|
||||
fipConfigurationProperties = &configProperties
|
||||
} else {
|
||||
pipName, err := az.determinePublicIPName(clusterName, service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
domainNameLabel := getPublicIPLabel(service)
|
||||
pip, err := az.ensurePublicIPExists(serviceName, pipName, domainNameLabel)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lbIP = pip.IPAddress
|
||||
fipConfigurationProperties = &network.FrontendIPConfigurationPropertiesFormat{
|
||||
PublicIPAddress: &network.PublicIPAddress{ID: pip.ID},
|
||||
}
|
||||
}
|
||||
|
||||
lb, lbNeedsUpdate, err := az.reconcileLoadBalancer(lb, fipConfigurationProperties, clusterName, service, nodes)
|
||||
lbStatus, err := az.getServiceLoadBalancerStatus(service, lb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !existsLb || lbNeedsUpdate {
|
||||
glog.V(3).Infof("ensure(%s): lb(%s) - updating", serviceName, lbName)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%q): start", *lb.Name)
|
||||
respChan, errChan := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%q): end", *lb.Name)
|
||||
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
|
||||
glog.V(2).Infof("ensure(%s) backing off: lb(%s) - updating", serviceName, lbName)
|
||||
retryErr := az.CreateOrUpdateLBWithRetry(lb)
|
||||
if retryErr != nil {
|
||||
glog.V(2).Infof("ensure(%s) abort backoff: lb(%s) - updating", serviceName, lbName)
|
||||
return nil, retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var lbStatus *v1.LoadBalancerStatus
|
||||
if lbIP == nil {
|
||||
lbStatus, exists, err := az.GetLoadBalancer(clusterName, service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("ensure(%s): lb(%s) - failed to get back load balancer", serviceName, lbName)
|
||||
}
|
||||
lbIP = &lbStatus.Ingress[0].IP
|
||||
}
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("SecurityGroupsClient.Get(%q): start", az.SecurityGroupName)
|
||||
sg, err := az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "")
|
||||
glog.V(10).Infof("SecurityGroupsClient.Get(%q): end", az.SecurityGroupName)
|
||||
if err != nil {
|
||||
if _, err := az.reconcileSecurityGroup(clusterName, service, lbStatus, true /* wantLb */); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sg, sgNeedsUpdate, err := az.reconcileSecurityGroup(sg, clusterName, service, lbIP, true /* wantLb */)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if sgNeedsUpdate {
|
||||
glog.V(3).Infof("ensure(%s): sg(%s) - updating", serviceName, *sg.Name)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%q): start", *sg.Name)
|
||||
respChan, errChan := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%q): end", *sg.Name)
|
||||
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
|
||||
glog.V(2).Infof("ensure(%s) backing off: sg(%s) - updating", serviceName, *sg.Name)
|
||||
retryErr := az.CreateOrUpdateSGWithRetry(sg)
|
||||
if retryErr != nil {
|
||||
glog.V(2).Infof("ensure(%s) abort backoff: sg(%s) - updating", serviceName, *sg.Name)
|
||||
return nil, retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Add the machines to the backend pool if they're not already
|
||||
lbBackendName := getBackendPoolName(clusterName)
|
||||
lbBackendPoolID := az.getBackendPoolID(lbName, lbBackendName)
|
||||
hostUpdates := make([]func() error, len(nodes))
|
||||
for i, node := range nodes {
|
||||
localNodeName := node.Name
|
||||
f := func() error {
|
||||
err := az.ensureHostInPool(serviceName, types.NodeName(localNodeName), lbBackendPoolID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ensure(%s): lb(%s) - failed to ensure host in pool: %q", serviceName, lbName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
hostUpdates[i] = f
|
||||
}
|
||||
|
||||
errs := utilerrors.AggregateGoroutines(hostUpdates...)
|
||||
if errs != nil {
|
||||
return nil, utilerrors.Flatten(errs)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("ensure(%s): lb(%s) finished", serviceName, lbName)
|
||||
|
||||
if lbStatus != nil {
|
||||
return lbStatus, nil
|
||||
}
|
||||
|
||||
if lbIP == nil {
|
||||
lbStatus, exists, err := az.GetLoadBalancer(clusterName, service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("ensure(%s): lb(%s) - failed to get back load balancer", serviceName, lbName)
|
||||
}
|
||||
return lbStatus, nil
|
||||
}
|
||||
|
||||
return &v1.LoadBalancerStatus{
|
||||
Ingress: []v1.LoadBalancerIngress{{IP: *lbIP}},
|
||||
}, nil
|
||||
return lbStatus, nil
|
||||
}
|
||||
|
||||
// UpdateLoadBalancer updates hosts under the specified load balancer.
|
||||
@ -332,146 +119,152 @@ func (az *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nod
|
||||
// have multiple underlying components, meaning a Get could say that the LB
|
||||
// doesn't exist even if some part of it is still laying around.
|
||||
func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error {
|
||||
isInternal := requiresInternalLoadBalancer(service)
|
||||
lbName := getLoadBalancerName(clusterName, isInternal)
|
||||
serviceName := getServiceName(service)
|
||||
|
||||
glog.V(5).Infof("delete(%s): START clusterName=%q lbName=%q", serviceName, clusterName, lbName)
|
||||
|
||||
err := az.cleanupLoadBalancer(clusterName, service, isInternal)
|
||||
if err != nil {
|
||||
glog.V(5).Infof("delete(%s): START clusterName=%q", serviceName, clusterName)
|
||||
if _, err := az.reconcileSecurityGroup(clusterName, service, nil, false /* wantLb */); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sg, existsSg, err := az.getSecurityGroup()
|
||||
if err != nil {
|
||||
if _, err := az.reconcileLoadBalancer(clusterName, service, nil, false /* wantLb */); err != nil {
|
||||
return err
|
||||
}
|
||||
if existsSg {
|
||||
reconciledSg, sgNeedsUpdate, reconcileErr := az.reconcileSecurityGroup(sg, clusterName, service, nil, false /* wantLb */)
|
||||
if reconcileErr != nil {
|
||||
return reconcileErr
|
||||
}
|
||||
if sgNeedsUpdate {
|
||||
glog.V(3).Infof("delete(%s): sg(%s) - updating", serviceName, az.SecurityGroupName)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%q): start", *reconciledSg.Name)
|
||||
respChan, errChan := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *reconciledSg.Name, reconciledSg, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%q): end", *reconciledSg.Name)
|
||||
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
|
||||
glog.V(2).Infof("delete(%s) backing off: sg(%s) - updating", serviceName, az.SecurityGroupName)
|
||||
retryErr := az.CreateOrUpdateSGWithRetry(reconciledSg)
|
||||
if retryErr != nil {
|
||||
err = retryErr
|
||||
glog.V(2).Infof("delete(%s) abort backoff: sg(%s) - updating", serviceName, az.SecurityGroupName)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := az.reconcilePublicIP(clusterName, service, false /* wantLb */); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
glog.V(2).Infof("delete(%s): FINISH", serviceName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (az *Cloud) cleanupLoadBalancer(clusterName string, service *v1.Service, isInternalLb bool) error {
|
||||
lbName := getLoadBalancerName(clusterName, isInternalLb)
|
||||
serviceName := getServiceName(service)
|
||||
// getServiceLoadBalancer gets the loadbalancer for the service if it already exits
|
||||
// If wantLb is TRUE then -it selects a new load balancer
|
||||
// In case the selected load balancer does not exists it returns network.LoadBalancer struct
|
||||
// with added metadata (such as name, location) and existsLB set to FALSE
|
||||
// By default - cluster default LB is returned
|
||||
func (az *Cloud) getServiceLoadBalancer(service *v1.Service, clusterName string, nodes []*v1.Node, wantLb bool) (lb *network.LoadBalancer, status *v1.LoadBalancerStatus, exists bool, err error) {
|
||||
isInternal := requiresInternalLoadBalancer(service)
|
||||
var defaultLB *network.LoadBalancer
|
||||
defaultLBName := az.getLoadBalancerName(clusterName, az.Config.PrimaryAvailabilitySetName, isInternal)
|
||||
|
||||
glog.V(10).Infof("ensure lb deleted: clusterName=%q, serviceName=%s, lbName=%q", clusterName, serviceName, lbName)
|
||||
|
||||
lb, existsLb, err := az.getAzureLoadBalancer(lbName)
|
||||
lbs, err := az.ListLBWithRetry()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, nil, false, err
|
||||
}
|
||||
if existsLb {
|
||||
var publicIPToCleanup *string
|
||||
|
||||
if !isInternalLb {
|
||||
// Find public ip resource to clean up from IP configuration
|
||||
lbFrontendIPConfigName := getFrontendIPConfigName(service, nil)
|
||||
for _, config := range *lb.FrontendIPConfigurations {
|
||||
if strings.EqualFold(*config.Name, lbFrontendIPConfigName) {
|
||||
if config.PublicIPAddress != nil {
|
||||
// Only ID property is available
|
||||
publicIPToCleanup = config.PublicIPAddress.ID
|
||||
}
|
||||
break
|
||||
}
|
||||
if lbs != nil {
|
||||
for lbx := range lbs {
|
||||
lb := &(lbs[lbx])
|
||||
if strings.EqualFold(*lb.Name, defaultLBName) {
|
||||
defaultLB = lb
|
||||
}
|
||||
if isInternalLoadBalancer(lb) != isInternal {
|
||||
continue
|
||||
}
|
||||
status, err = az.getServiceLoadBalancerStatus(service, lb)
|
||||
if err != nil {
|
||||
return nil, nil, false, err
|
||||
}
|
||||
if status == nil {
|
||||
// service is not om this load balancer
|
||||
continue
|
||||
}
|
||||
|
||||
return lb, status, true, nil
|
||||
}
|
||||
}
|
||||
// service does not have a load balancer, select one
|
||||
if wantLb {
|
||||
// select new load balancer for service
|
||||
lb, exists, err = az.selectLoadBalancer(clusterName, service, &lbs, nodes)
|
||||
if err != nil {
|
||||
return nil, nil, false, err
|
||||
}
|
||||
|
||||
lb, lbNeedsUpdate, reconcileErr := az.reconcileLoadBalancer(lb, nil, clusterName, service, []*v1.Node{})
|
||||
if reconcileErr != nil {
|
||||
return reconcileErr
|
||||
return lb, nil, exists, err
|
||||
}
|
||||
if defaultLB == nil {
|
||||
defaultLB = &network.LoadBalancer{
|
||||
Name: &defaultLBName,
|
||||
Location: &az.Location,
|
||||
LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{},
|
||||
}
|
||||
if lbNeedsUpdate {
|
||||
if len(*lb.FrontendIPConfigurations) > 0 {
|
||||
glog.V(3).Infof("delete(%s): lb(%s) - updating", serviceName, lbName)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%q): start", *lb.Name)
|
||||
respChan, errChan := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%q): end", *lb.Name)
|
||||
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
|
||||
glog.V(2).Infof("delete(%s) backing off: sg(%s) - updating", serviceName, az.SecurityGroupName)
|
||||
retryErr := az.CreateOrUpdateLBWithRetry(lb)
|
||||
if retryErr != nil {
|
||||
err = retryErr
|
||||
glog.V(2).Infof("delete(%s) abort backoff: sg(%s) - updating", serviceName, az.SecurityGroupName)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return defaultLB, nil, false, nil
|
||||
}
|
||||
|
||||
func (az *Cloud) getServiceLoadBalancerStatus(service *v1.Service, lb *network.LoadBalancer) (status *v1.LoadBalancerStatus, err error) {
|
||||
if lb == nil {
|
||||
glog.V(10).Infof("getServiceLoadBalancerStatus lb is nil")
|
||||
return nil, nil
|
||||
}
|
||||
if lb.FrontendIPConfigurations == nil || *lb.FrontendIPConfigurations == nil {
|
||||
return nil, nil
|
||||
}
|
||||
isInternal := requiresInternalLoadBalancer(service)
|
||||
lbFrontendIPConfigName := getFrontendIPConfigName(service, subnet(service))
|
||||
serviceName := getServiceName(service)
|
||||
for _, ipConfiguration := range *lb.FrontendIPConfigurations {
|
||||
if lbFrontendIPConfigName == *ipConfiguration.Name {
|
||||
var lbIP *string
|
||||
if isInternal {
|
||||
lbIP = ipConfiguration.PrivateIPAddress
|
||||
} else {
|
||||
glog.V(3).Infof("delete(%s): lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("LoadBalancerClient.Delete(%q): start", lbName)
|
||||
respChan, errChan := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("LoadBalancerClient.Delete(%q): end", lbName)
|
||||
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
|
||||
glog.V(2).Infof("delete(%s) backing off: lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
|
||||
retryErr := az.DeleteLBWithRetry(lbName)
|
||||
if retryErr != nil {
|
||||
err = retryErr
|
||||
glog.V(2).Infof("delete(%s) abort backoff: lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
|
||||
}
|
||||
if ipConfiguration.PublicIPAddress == nil {
|
||||
return nil, fmt.Errorf("get(%s): lb(%s) - failed to get LB PublicIPAddress is Nil", serviceName, *lb.Name)
|
||||
}
|
||||
pipID := ipConfiguration.PublicIPAddress.ID
|
||||
if pipID == nil {
|
||||
return nil, fmt.Errorf("get(%s): lb(%s) - failed to get LB PublicIPAddress ID is Nil", serviceName, *lb.Name)
|
||||
}
|
||||
pipName, err := getLastSegment(*pipID)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, fmt.Errorf("get(%s): lb(%s) - failed to get LB PublicIPAddress Name from ID(%s)", serviceName, *lb.Name, *pipID)
|
||||
}
|
||||
pip, existsPip, err := az.getPublicIPAddress(pipName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if existsPip {
|
||||
lbIP = pip.IPAddress
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Public IP can be deleted after frontend ip configuration rule deleted.
|
||||
if publicIPToCleanup != nil {
|
||||
// Only delete an IP address if we created it, deducing by name.
|
||||
if index := strings.LastIndex(*publicIPToCleanup, "/"); index != -1 {
|
||||
managedPipName := getPublicIPName(clusterName, service)
|
||||
pipName := (*publicIPToCleanup)[index+1:]
|
||||
if strings.EqualFold(managedPipName, pipName) {
|
||||
glog.V(5).Infof("Deleting public IP resource %q.", pipName)
|
||||
err = az.ensurePublicIPDeleted(serviceName, pipName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
glog.V(5).Infof("Public IP resource %q found, but it does not match managed name %q, skip deleting.", pipName, managedPipName)
|
||||
}
|
||||
}
|
||||
return &v1.LoadBalancerStatus{Ingress: []v1.LoadBalancerIngress{{IP: *lbIP}}}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (az *Cloud) determinePublicIPName(clusterName string, service *v1.Service) (string, error) {
|
||||
loadBalancerIP := service.Spec.LoadBalancerIP
|
||||
if len(loadBalancerIP) == 0 {
|
||||
return getPublicIPName(clusterName, service), nil
|
||||
}
|
||||
|
||||
pips, err := az.ListPIPWithRetry()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
for _, pip := range pips {
|
||||
if pip.PublicIPAddressPropertiesFormat.IPAddress != nil &&
|
||||
*pip.PublicIPAddressPropertiesFormat.IPAddress == loadBalancerIP {
|
||||
return *pip.Name, nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("user supplied IP Address %s was not found", loadBalancerIP)
|
||||
}
|
||||
|
||||
func flipServiceInternalAnnotation(service *v1.Service) *v1.Service {
|
||||
copyService := service.DeepCopy()
|
||||
if _, ok := copyService.Annotations[ServiceAnnotationLoadBalancerInternal]; ok {
|
||||
delete(copyService.Annotations, ServiceAnnotationLoadBalancerInternal)
|
||||
} else {
|
||||
copyService.Annotations[ServiceAnnotationLoadBalancerInternal] = "true"
|
||||
}
|
||||
return copyService
|
||||
}
|
||||
|
||||
func (az *Cloud) ensurePublicIPExists(serviceName, pipName, domainNameLabel string) (*network.PublicIPAddress, error) {
|
||||
@ -494,7 +287,6 @@ func (az *Cloud) ensurePublicIPExists(serviceName, pipName, domainNameLabel stri
|
||||
}
|
||||
}
|
||||
pip.Tags = &map[string]*string{"service": &serviceName}
|
||||
|
||||
glog.V(3).Infof("ensure(%s): pip(%s) - creating", serviceName, *pip.Name)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("PublicIPAddressesClient.CreateOrUpdate(%q): start", *pip.Name)
|
||||
@ -523,44 +315,27 @@ func (az *Cloud) ensurePublicIPExists(serviceName, pipName, domainNameLabel stri
|
||||
}
|
||||
|
||||
return &pip, nil
|
||||
|
||||
}
|
||||
|
||||
func (az *Cloud) ensurePublicIPDeleted(serviceName, pipName string) error {
|
||||
glog.V(2).Infof("ensure(%s): pip(%s) - deleting", serviceName, pipName)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("PublicIPAddressesClient.Delete(%q): start", pipName)
|
||||
resp, deleteErrChan := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil)
|
||||
deleteErr := <-deleteErrChan
|
||||
glog.V(10).Infof("PublicIPAddressesClient.Delete(%q): end", pipName) // response not read yet...
|
||||
if az.CloudProviderBackoff && shouldRetryAPIRequest(<-resp, deleteErr) {
|
||||
glog.V(2).Infof("ensure(%s) backing off: pip(%s) - deleting", serviceName, pipName)
|
||||
retryErr := az.DeletePublicIPWithRetry(pipName)
|
||||
if retryErr != nil {
|
||||
glog.V(2).Infof("ensure(%s) abort backoff: pip(%s) - deleting", serviceName, pipName)
|
||||
return retryErr
|
||||
}
|
||||
}
|
||||
_, realErr := checkResourceExistsFromError(deleteErr)
|
||||
if realErr != nil {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This ensures load balancer exists and the frontend ip config is setup.
|
||||
// This also reconciles the Service's Ports with the LoadBalancer config.
|
||||
// This entails adding rules/probes for expected Ports and removing stale rules/ports.
|
||||
func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfigurationProperties *network.FrontendIPConfigurationPropertiesFormat, clusterName string, service *v1.Service, nodes []*v1.Node) (network.LoadBalancer, bool, error) {
|
||||
// nodes only used if wantLB is true
|
||||
func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node, wantLb bool) (*network.LoadBalancer, error) {
|
||||
isInternal := requiresInternalLoadBalancer(service)
|
||||
lbName := getLoadBalancerName(clusterName, isInternal)
|
||||
serviceName := getServiceName(service)
|
||||
glog.V(2).Infof("reconcileLoadBalancer(%s) - wantLB(%t): started", serviceName, wantLb)
|
||||
lb, _, _, err := az.getServiceLoadBalancer(service, clusterName, nodes, wantLb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lbName := *lb.Name
|
||||
glog.V(2).Infof("reconcileLoadBalancer(%s): lb(%s) wantLB(%t) resolved load balancer name", serviceName, lbName, wantLb)
|
||||
lbFrontendIPConfigName := getFrontendIPConfigName(service, subnet(service))
|
||||
lbFrontendIPConfigID := az.getFrontendIPConfigID(lbName, lbFrontendIPConfigName)
|
||||
lbBackendPoolName := getBackendPoolName(clusterName)
|
||||
lbBackendPoolID := az.getBackendPoolID(lbName, lbBackendPoolName)
|
||||
|
||||
wantLb := fipConfigurationProperties != nil
|
||||
dirtyLb := false
|
||||
|
||||
// Ensure LoadBalancer's Backend Pool Configuration
|
||||
@ -597,6 +372,7 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration
|
||||
if lb.FrontendIPConfigurations != nil {
|
||||
newConfigs = *lb.FrontendIPConfigurations
|
||||
}
|
||||
|
||||
if !wantLb {
|
||||
for i := len(newConfigs) - 1; i >= 0; i-- {
|
||||
config := newConfigs[i]
|
||||
@ -625,6 +401,51 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration
|
||||
}
|
||||
}
|
||||
if !foundConfig {
|
||||
// construct FrontendIPConfigurationPropertiesFormat
|
||||
var fipConfigurationProperties *network.FrontendIPConfigurationPropertiesFormat
|
||||
if isInternal {
|
||||
subnetName := subnet(service)
|
||||
if subnetName == nil {
|
||||
subnetName = &az.SubnetName
|
||||
}
|
||||
subnet, existsSubnet, err := az.getSubnet(az.VnetName, *subnetName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !existsSubnet {
|
||||
return nil, fmt.Errorf("ensure(%s): lb(%s) - failed to get subnet: %s/%s", serviceName, lbName, az.VnetName, az.SubnetName)
|
||||
}
|
||||
|
||||
configProperties := network.FrontendIPConfigurationPropertiesFormat{
|
||||
Subnet: &subnet,
|
||||
}
|
||||
|
||||
loadBalancerIP := service.Spec.LoadBalancerIP
|
||||
if loadBalancerIP != "" {
|
||||
configProperties.PrivateIPAllocationMethod = network.Static
|
||||
configProperties.PrivateIPAddress = &loadBalancerIP
|
||||
} else {
|
||||
// We'll need to call GetLoadBalancer later to retrieve allocated IP.
|
||||
configProperties.PrivateIPAllocationMethod = network.Dynamic
|
||||
}
|
||||
|
||||
fipConfigurationProperties = &configProperties
|
||||
} else {
|
||||
pipName, err := az.determinePublicIPName(clusterName, service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
domainNameLabel := getPublicIPLabel(service)
|
||||
pip, err := az.ensurePublicIPExists(serviceName, pipName, domainNameLabel)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fipConfigurationProperties = &network.FrontendIPConfigurationPropertiesFormat{
|
||||
PublicIPAddress: &network.PublicIPAddress{ID: pip.ID},
|
||||
}
|
||||
}
|
||||
|
||||
newConfigs = append(newConfigs,
|
||||
network.FrontendIPConfiguration{
|
||||
Name: to.StringPtr(lbFrontendIPConfigName),
|
||||
@ -654,7 +475,7 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration
|
||||
|
||||
transportProto, _, probeProto, err := getProtocolsFromKubernetesProtocol(port.Protocol)
|
||||
if err != nil {
|
||||
return lb, false, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if serviceapi.NeedsHealthCheck(service) {
|
||||
@ -662,7 +483,7 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration
|
||||
// ERROR: this isn't supported
|
||||
// health check (aka source ip preservation) is not
|
||||
// compatible with UDP (it uses an HTTP check)
|
||||
return lb, false, fmt.Errorf("services requiring health checks are incompatible with UDP ports")
|
||||
return nil, fmt.Errorf("services requiring health checks are incompatible with UDP ports")
|
||||
}
|
||||
|
||||
podPresencePath, podPresencePort := serviceapi.GetServiceHealthCheckPathPort(service)
|
||||
@ -803,24 +624,115 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration
|
||||
lb.LoadBalancingRules = &updatedRules
|
||||
}
|
||||
|
||||
return lb, dirtyLb, nil
|
||||
// We don't care if the LB exists or not
|
||||
// We only care about if there is any change in the LB, which means dirtyLB
|
||||
// If it is not exist, and no change to that, we don't CreateOrUpdate LB
|
||||
if dirtyLb {
|
||||
if lb.FrontendIPConfigurations == nil || len(*lb.FrontendIPConfigurations) == 0 {
|
||||
// When FrontendIPConfigurations is empty, we need to delete the Azure LoadBalancer resource itself
|
||||
// Because delete all FrontendIPConfigurations in LB is not supported, we have to delete the LB itself
|
||||
glog.V(3).Infof("delete(%s): lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("LoadBalancerClient.Delete(%q): start", lbName)
|
||||
respChan, errChan := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("LoadBalancerClient.Delete(%q): end", lbName)
|
||||
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp, err) {
|
||||
glog.V(2).Infof("delete(%s) backing off: lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
|
||||
retryErr := az.DeleteLBWithRetry(lbName)
|
||||
if retryErr != nil {
|
||||
err = retryErr
|
||||
glog.V(2).Infof("delete(%s) abort backoff: lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
} else {
|
||||
glog.V(3).Infof("ensure(%s): lb(%s) - updating", serviceName, lbName)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%q): start", lbName)
|
||||
respChan, errChan := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, lbName, *lb, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%q): end", lbName)
|
||||
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
|
||||
glog.V(2).Infof("ensure(%s) backing off: lb(%s) - updating", serviceName, lbName)
|
||||
retryErr := az.CreateOrUpdateLBWithRetry(*lb)
|
||||
if retryErr != nil {
|
||||
glog.V(2).Infof("ensure(%s) abort backoff: lb(%s) - updating", serviceName, lbName)
|
||||
return nil, retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if wantLb && nodes != nil {
|
||||
// Add the machines to the backend pool if they're not already
|
||||
availabilitySetName := az.mapLoadBalancerNameToAvailabilitySet(lbName, clusterName)
|
||||
hostUpdates := make([]func() error, len(nodes))
|
||||
for i, node := range nodes {
|
||||
localNodeName := node.Name
|
||||
f := func() error {
|
||||
err := az.ensureHostInPool(serviceName, types.NodeName(localNodeName), lbBackendPoolID, availabilitySetName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ensure(%s): lb(%s) - failed to ensure host in pool: %q", serviceName, lbName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
hostUpdates[i] = f
|
||||
}
|
||||
|
||||
errs := utilerrors.AggregateGoroutines(hostUpdates...)
|
||||
if errs != nil {
|
||||
return nil, utilerrors.Flatten(errs)
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(2).Infof("ensure(%s): lb(%s) finished", serviceName, lbName)
|
||||
return lb, nil
|
||||
}
|
||||
|
||||
// This reconciles the Network Security Group similar to how the LB is reconciled.
|
||||
// This entails adding required, missing SecurityRules and removing stale rules.
|
||||
func (az *Cloud) reconcileSecurityGroup(sg network.SecurityGroup, clusterName string, service *v1.Service, lbIP *string, wantLb bool) (network.SecurityGroup, bool, error) {
|
||||
func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service, lbStatus *v1.LoadBalancerStatus, wantLb bool) (*network.SecurityGroup, error) {
|
||||
serviceName := getServiceName(service)
|
||||
glog.V(5).Infof("ensure(%s): START clusterName=%q lbName=%q", serviceName, clusterName)
|
||||
|
||||
var ports []v1.ServicePort
|
||||
if wantLb {
|
||||
ports = service.Spec.Ports
|
||||
} else {
|
||||
ports = []v1.ServicePort{}
|
||||
}
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("SecurityGroupsClient.Get(%q): start", az.SecurityGroupName)
|
||||
sg, err := az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "")
|
||||
glog.V(10).Infof("SecurityGroupsClient.Get(%q): end", az.SecurityGroupName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("SecurityGroupsClient.Get(%q): start", az.SecurityGroupName)
|
||||
sg, err = az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "")
|
||||
glog.V(10).Infof("SecurityGroupsClient.Get(%q): end", az.SecurityGroupName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
destinationIPAddress := ""
|
||||
if wantLb {
|
||||
// Get lbIP since we make up NSG rules based on ingress IP
|
||||
lbIP := &lbStatus.Ingress[0].IP
|
||||
if lbIP == nil {
|
||||
return sg, false, fmt.Errorf("No load balancer IP for setting up security rules for service %s", service.Name)
|
||||
return &sg, fmt.Errorf("No load balancer IP for setting up security rules for service %s", service.Name)
|
||||
}
|
||||
destinationIPAddress = *lbIP
|
||||
}
|
||||
@ -830,7 +742,7 @@ func (az *Cloud) reconcileSecurityGroup(sg network.SecurityGroup, clusterName st
|
||||
|
||||
sourceRanges, err := serviceapi.GetLoadBalancerSourceRanges(service)
|
||||
if err != nil {
|
||||
return sg, false, err
|
||||
return nil, err
|
||||
}
|
||||
var sourceAddressPrefixes []string
|
||||
if sourceRanges == nil || serviceapi.IsAllowAll(sourceRanges) {
|
||||
@ -847,7 +759,7 @@ func (az *Cloud) reconcileSecurityGroup(sg network.SecurityGroup, clusterName st
|
||||
for i, port := range ports {
|
||||
_, securityProto, _, err := getProtocolsFromKubernetesProtocol(port.Protocol)
|
||||
if err != nil {
|
||||
return sg, false, err
|
||||
return nil, err
|
||||
}
|
||||
for j := range sourceAddressPrefixes {
|
||||
ix := i*len(sourceAddressPrefixes) + j
|
||||
@ -902,7 +814,7 @@ func (az *Cloud) reconcileSecurityGroup(sg network.SecurityGroup, clusterName st
|
||||
|
||||
nextAvailablePriority, err := getNextAvailablePriority(updatedRules)
|
||||
if err != nil {
|
||||
return sg, false, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
expectedRule.Priority = to.Int32Ptr(nextAvailablePriority)
|
||||
@ -912,8 +824,90 @@ func (az *Cloud) reconcileSecurityGroup(sg network.SecurityGroup, clusterName st
|
||||
}
|
||||
if dirtySg {
|
||||
sg.SecurityRules = &updatedRules
|
||||
glog.V(3).Infof("ensure(%s): sg(%s) - updating", serviceName, *sg.Name)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%q): start", *sg.Name)
|
||||
respChan, errChan := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%q): end", *sg.Name)
|
||||
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
|
||||
glog.V(2).Infof("ensure(%s) backing off: sg(%s) - updating", serviceName, *sg.Name)
|
||||
retryErr := az.CreateOrUpdateSGWithRetry(sg)
|
||||
if retryErr != nil {
|
||||
glog.V(2).Infof("ensure(%s) abort backoff: sg(%s) - updating", serviceName, *sg.Name)
|
||||
return nil, retryErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return sg, dirtySg, nil
|
||||
return &sg, nil
|
||||
}
|
||||
|
||||
// This reconciles the PublicIP resources similar to how the LB is reconciled.
|
||||
// This entails adding required, missing SecurityRules and removing stale rules.
|
||||
func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, wantLb bool) (*network.PublicIPAddress, error) {
|
||||
isInternal := requiresInternalLoadBalancer(service)
|
||||
serviceName := getServiceName(service)
|
||||
desiredPipName, err := az.determinePublicIPName(clusterName, service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pips, err := az.ListPIPWithRetry()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, pip := range pips {
|
||||
if pip.Tags != nil &&
|
||||
(*pip.Tags)["service"] != nil &&
|
||||
*(*pip.Tags)["service"] == serviceName {
|
||||
// We need to process for pips belong to this service
|
||||
pipName := *pip.Name
|
||||
if wantLb && !isInternal && pipName == desiredPipName {
|
||||
// This is the only case we should preserve the
|
||||
// Public ip resource with match service tag
|
||||
// We could do nothing here, we will ensure that out of the loop
|
||||
} else {
|
||||
// We use tag to decide which IP should be removed
|
||||
glog.V(2).Infof("ensure(%s): pip(%s) - deleting", serviceName, pipName)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("PublicIPAddressesClient.Delete(%q): start", pipName)
|
||||
resp, deleteErrChan := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil)
|
||||
deleteErr := <-deleteErrChan
|
||||
glog.V(10).Infof("PublicIPAddressesClient.Delete(%q): end", pipName) // response not read yet...
|
||||
if az.CloudProviderBackoff && shouldRetryAPIRequest(<-resp, deleteErr) {
|
||||
glog.V(2).Infof("ensure(%s) backing off: pip(%s) - deleting", serviceName, pipName)
|
||||
retryErr := az.DeletePublicIPWithRetry(pipName)
|
||||
if retryErr != nil {
|
||||
glog.V(2).Infof("ensure(%s) abort backoff: pip(%s) - deleting", serviceName, pipName)
|
||||
return nil, retryErr
|
||||
}
|
||||
}
|
||||
|
||||
deleteErr = ignoreStatusNotFoundFromError(deleteErr)
|
||||
if deleteErr != nil {
|
||||
return nil, deleteErr
|
||||
}
|
||||
glog.V(2).Infof("ensure(%s): pip(%s) - finished", serviceName, pipName)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if !isInternal && wantLb {
|
||||
// Confirm desired public ip resource exists
|
||||
var rpip *network.PublicIPAddress
|
||||
domainNameLabel := getPublicIPLabel(service)
|
||||
if rpip, err = az.ensurePublicIPExists(serviceName, desiredPipName, domainNameLabel); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rpip, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func findProbe(probes []network.Probe, probe network.Probe) bool {
|
||||
@ -945,7 +939,7 @@ func findSecurityRule(rules []network.SecurityRule, rule network.SecurityRule) b
|
||||
|
||||
// This ensures the given VM's Primary NIC's Primary IP Configuration is
|
||||
// participating in the specified LoadBalancer Backend Pool.
|
||||
func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, backendPoolID string) error {
|
||||
func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, backendPoolID string, availabilitySetName string) error {
|
||||
var machine compute.VirtualMachine
|
||||
vmName := mapNodeNameToVMName(nodeName)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
@ -975,12 +969,12 @@ func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, b
|
||||
}
|
||||
|
||||
// Check availability set
|
||||
if az.PrimaryAvailabilitySetName != "" {
|
||||
expectedAvailabilitySetName := az.getAvailabilitySetID(az.PrimaryAvailabilitySetName)
|
||||
if availabilitySetName != "" {
|
||||
expectedAvailabilitySetName := az.getAvailabilitySetID(availabilitySetName)
|
||||
if machine.AvailabilitySet == nil || !strings.EqualFold(*machine.AvailabilitySet.ID, expectedAvailabilitySetName) {
|
||||
glog.V(3).Infof(
|
||||
"nicupdate(%s): skipping nic (%s) since it is not in the primaryAvailabilitSet(%s)",
|
||||
serviceName, nicName, az.PrimaryAvailabilitySetName)
|
||||
"nicupdate(%s): skipping nic (%s) since it is not in the availabilitSet(%s)",
|
||||
serviceName, nicName, availabilitySetName)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -1058,3 +1052,16 @@ func subnet(service *v1.Service) *string {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getServiceLoadBalancerMode(service *v1.Service) (hasMode bool, isAuto bool, asl []string) {
|
||||
mode, hasMode := service.Annotations[ServiceAnnotationLoadBalancerMode]
|
||||
isAuto = strings.EqualFold(mode, ServiceAnnotationLoadBalancerAutoModeValue)
|
||||
if !isAuto {
|
||||
asTagList := strings.TrimSpace(mode)
|
||||
|
||||
// Break up list of "AS1,AS2"
|
||||
asl = strings.Split(asTagList, ",")
|
||||
}
|
||||
|
||||
return hasMode, isAuto, asl
|
||||
}
|
||||
|
68
pkg/cloudprovider/providers/azure/azure_loadbalancer.md
Normal file
68
pkg/cloudprovider/providers/azure/azure_loadbalancer.md
Normal file
@ -0,0 +1,68 @@
|
||||
# Azure LoadBalancer
|
||||
|
||||
The way azure define LoadBalancer is different with GCE or AWS. Azure's LB can have multiple frontend IP refs. The GCE and AWS can only allow one, if you want more, you better to have another LB. Because of the fact, Public IP is not part of the LB in Azure. NSG is not part of LB in Azure as well. However, you cannot delete them in parallel, Public IP can only be delete after LB's frontend IP ref is removed.
|
||||
|
||||
For different Azure Resources, such as LB, Public IP, NSG. They are the same tier azure resourceS. We need to make sure there is no connection in their own ensure loops. In another words, They would be eventually reconciled regardless of other resources' state. They should only depends on service state.
|
||||
|
||||
And also, For Azure, we cannot afford to have more than 1 worker of service_controller. Because, different services could operate on the same LB, concurrent execution could result in conflict or unexpected result. For AWS and GCE, they apparently doesn't have the problem, they use one LB per service, no such conflict.
|
||||
|
||||
There are two load balancers per availability set internal and external. There is a limit on number of services that can be associated with a single load balancer.
|
||||
By default primary load balancer is selected. Services can be annotated to allow auto selection of available load balancers. Service annotations can also be used to provide specific availability sets that host the load balancers. Note that in case of auto selection or specific availability set selection, when the availability set is lost incase of downtime or cluster scale down the services are currently not auto assigned to an available load balancer.
|
||||
Service Annotation for Auto and specific load balancer mode
|
||||
|
||||
- service.beta.kubernetes.io/azure-load-balancer-mode" (__auto__|as1,as2...)
|
||||
|
||||
## Introduce Functions
|
||||
|
||||
- reconcileLoadBalancer(lb network.LoadBalancer, clusterName string, service *v1.Service, nodes []*v1.Node, wantLB bool) (network.LoadBalancer, error)
|
||||
- Go through lb's properties, update based on wantLB
|
||||
- If any change on the lb, no matter if the lb exists or not
|
||||
- Call az cloud to CreateOrUpdate on this lb, or Delete if nothing left
|
||||
- return lb, err
|
||||
|
||||
- reconcileSecurityGroup(sg network.SecurityGroup, clusterName string, service *v1.Service, wantLb bool) (network.SecurityGroup, error)
|
||||
- Go though NSG' properties, update based on wantLB
|
||||
- If any change on the NSG, (the NSG should always exists)
|
||||
- Call az cloud to CreateOrUpdate on this NSG
|
||||
- return sg, err
|
||||
|
||||
- reconcilePublicIP(pipName string, clusterName string, service *v1.Service, wantLB bool) (error)
|
||||
- if wantLB and external LB,
|
||||
- ensure Azure Public IP resource is there
|
||||
- when we ensure Public IP, it needs to be both Name and Tag match with the convention
|
||||
- remove dangling Public IP that could have Name or Tag match with the service, but not both
|
||||
- else, ensure Azure Public IP resource is not there
|
||||
|
||||
- getServiceLoadBalancer(service *v1.Service, clusterName string, nodes []*v1.Node, wantLb bool) (lb, status, exists, error)
|
||||
- gets the loadbalancer for the service if it already exits
|
||||
- If wantLb is TRUE then -it selects a new load balancer, the selction helps distribute the services across load balancers
|
||||
- In case the selected load balancer does not exists it returns network.LoadBalancer struct with added metadata (such as name, location) and existsLB set to FALSE
|
||||
- By default - cluster default LB is returned
|
||||
|
||||
## Define interface behaviors
|
||||
|
||||
### GetLoadBalancer
|
||||
|
||||
- Get LoadBalancer status, return status, error
|
||||
- If not exist, ensure it is there
|
||||
|
||||
### EnsureLoadBalancer
|
||||
|
||||
- Reconcile LB's related but not owned resources, such as Public IP, NSG rules
|
||||
- Call reconcileSecurityGroup(sg, clusterName, service, true)
|
||||
- Call reconcilePublicIP(pipName, cluster, service, true)
|
||||
- Reconcile LB's related and owned resources, such as FrontEndIPConfig, Rules, Probe.
|
||||
- Call reconcileLoadBalancer(lb, clusterName, service, nodes, true)
|
||||
|
||||
### UpdateLoadBalancer
|
||||
|
||||
- Has no difference with EnsureLoadBalancer
|
||||
|
||||
### EnsureLoadBalancerDeleted
|
||||
|
||||
- Reconcile NSG first, before reconcile LB, because SG need LB to be there
|
||||
- Call reconcileSecurityGroup(sg, clusterName, service, false)
|
||||
- Reconcile LB's related and owned resources, such as FrontEndIPConfig, Rules, Probe.
|
||||
- Call reconcileLoadBalancer(lb, clusterName, service, nodes, false)
|
||||
- Reconcile LB's related but not owned resources, such as Public IP
|
||||
- Call reconcilePublicIP(pipName, cluster, service, false)
|
File diff suppressed because it is too large
Load Diff
@ -20,7 +20,9 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"math"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
@ -31,6 +33,7 @@ import (
|
||||
"github.com/Azure/azure-sdk-for-go/arm/network"
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -44,6 +47,12 @@ const (
|
||||
loadBalancerRuleIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Network/loadBalancers/%s/loadBalancingRules/%s"
|
||||
loadBalancerProbeIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Network/loadBalancers/%s/probes/%s"
|
||||
securityRuleIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Network/networkSecurityGroups/%s/securityRules/%s"
|
||||
|
||||
// InternalLoadBalancerNameSuffix is load balancer posfix
|
||||
InternalLoadBalancerNameSuffix = "-internal"
|
||||
|
||||
// nodeLabelRole specifies the role of a node
|
||||
nodeLabelRole = "kubernetes.io/role"
|
||||
)
|
||||
|
||||
var providerIDRE = regexp.MustCompile(`^` + CloudProviderName + `://(?:.*)/Microsoft.Compute/virtualMachines/(.+)$`)
|
||||
@ -116,6 +125,197 @@ func (az *Cloud) getSecurityRuleID(securityRuleName string) string {
|
||||
securityRuleName)
|
||||
}
|
||||
|
||||
// returns the full identifier of a publicIPAddress.
|
||||
func (az *Cloud) getpublicIPAddressID(pipName string) string {
|
||||
return fmt.Sprintf(
|
||||
publicIPAddressIDTemplate,
|
||||
az.SubscriptionID,
|
||||
az.ResourceGroup,
|
||||
pipName)
|
||||
}
|
||||
|
||||
// select load balancer for the service in the cluster
|
||||
func (az *Cloud) selectLoadBalancer(clusterName string, service *v1.Service, existingLBs *[]network.LoadBalancer, nodes []*v1.Node) (selectedLB *network.LoadBalancer, existsLb bool, err error) {
|
||||
isInternal := requiresInternalLoadBalancer(service)
|
||||
serviceName := getServiceName(service)
|
||||
glog.V(3).Infof("selectLoadBalancer(%s): isInternal(%s) - start", serviceName, isInternal)
|
||||
availabilitySetNames, err := az.getLoadBalancerAvailabilitySetNames(service, nodes)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
glog.Infof("selectLoadBalancer(%s): isInternal(%s) - availabilitysetsname %v", serviceName, isInternal, *availabilitySetNames)
|
||||
mapExistingLBs := map[string]*network.LoadBalancer{}
|
||||
for lbx := range *existingLBs {
|
||||
lb := (*existingLBs)[lbx]
|
||||
mapExistingLBs[*lb.Name] = &lb
|
||||
}
|
||||
selectedLBRuleCount := math.MaxInt32
|
||||
for asx := range *availabilitySetNames {
|
||||
currASName := (*availabilitySetNames)[asx]
|
||||
currLBName := az.getLoadBalancerName(clusterName, currASName, isInternal)
|
||||
lb, ok := mapExistingLBs[currLBName]
|
||||
if !ok {
|
||||
// select this LB as this is a new LB and will have minimum rules
|
||||
// create tmp lb struct to hold metadata for the new load-balancer
|
||||
selectedLB = &network.LoadBalancer{
|
||||
Name: &currLBName,
|
||||
Location: &az.Location,
|
||||
LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{},
|
||||
}
|
||||
|
||||
return selectedLB, false, nil
|
||||
}
|
||||
|
||||
lbRules := *lb.LoadBalancingRules
|
||||
currLBRuleCount := 0
|
||||
if lbRules != nil {
|
||||
currLBRuleCount = len(lbRules)
|
||||
}
|
||||
if currLBRuleCount < selectedLBRuleCount {
|
||||
selectedLBRuleCount = currLBRuleCount
|
||||
selectedLB = lb
|
||||
}
|
||||
}
|
||||
|
||||
if selectedLB == nil {
|
||||
glog.Errorf("selectLoadBalancer service (%s) - unable to find load balancer for selected availability sets %v", serviceName, *availabilitySetNames)
|
||||
return nil, false, fmt.Errorf("selectLoadBalancer (%s)- unable to find load balancer for selected availability sets %v", serviceName, *availabilitySetNames)
|
||||
}
|
||||
// validate if the selected LB has not exceeded the MaximumLoadBalancerRuleCount
|
||||
if az.Config.MaximumLoadBalancerRuleCount != 0 && selectedLBRuleCount >= az.Config.MaximumLoadBalancerRuleCount {
|
||||
err = fmt.Errorf("selectLoadBalancer service (%s) - all available load balancers have exceeded maximum rule limit %d", serviceName, selectedLBRuleCount)
|
||||
glog.Error(err)
|
||||
return selectedLB, existsLb, err
|
||||
}
|
||||
|
||||
return selectedLB, existsLb, nil
|
||||
}
|
||||
|
||||
// getLoadBalancerAvailabilitySetNames selects all possible availability sets for
|
||||
// service load balancer, if the service has no loadbalancer mode annotaion returns the
|
||||
// primary availability set if service annotation for loadbalancer availability set
|
||||
// exists then return the eligible a availability set
|
||||
func (az *Cloud) getLoadBalancerAvailabilitySetNames(service *v1.Service, nodes []*v1.Node) (availabilitySetNames *[]string, err error) {
|
||||
hasMode, isAuto, serviceASL := getServiceLoadBalancerMode(service)
|
||||
if !hasMode {
|
||||
// legacy load balancer auto mode load balancer.
|
||||
availabilitySetNames = &[]string{az.Config.PrimaryAvailabilitySetName}
|
||||
return availabilitySetNames, nil
|
||||
}
|
||||
availabilitySetNames, err = az.getAgentPoolAvailabiliySets(nodes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(*availabilitySetNames) == 0 {
|
||||
return nil, fmt.Errorf("No availability sets found for nodes, node count(%d)", len(nodes))
|
||||
}
|
||||
// sort the list to have deterministic selection
|
||||
sort.Strings(*availabilitySetNames)
|
||||
if !isAuto {
|
||||
if serviceASL == nil || len(serviceASL) == 0 {
|
||||
return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value")
|
||||
}
|
||||
// validate availability set exists
|
||||
var found bool
|
||||
for sasx := range serviceASL {
|
||||
for asx := range *availabilitySetNames {
|
||||
if strings.EqualFold((*availabilitySetNames)[asx], serviceASL[sasx]) {
|
||||
found = true
|
||||
serviceASL[sasx] = (*availabilitySetNames)[asx]
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return nil, fmt.Errorf("availability set (%s) - not found", serviceASL[sasx])
|
||||
}
|
||||
}
|
||||
availabilitySetNames = &serviceASL
|
||||
}
|
||||
|
||||
return availabilitySetNames, nil
|
||||
}
|
||||
|
||||
// lists the virtual machines for for the resource group and then builds
|
||||
// a list of availability sets that match the nodes available to k8s
|
||||
func (az *Cloud) getAgentPoolAvailabiliySets(nodes []*v1.Node) (agentPoolAs *[]string, err error) {
|
||||
vms, err := az.VirtualMachineClientListWithRetry()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vmNameToAvailabilitySetID := make(map[string]string, len(vms))
|
||||
for vmx := range vms {
|
||||
vm := vms[vmx]
|
||||
if vm.AvailabilitySet != nil {
|
||||
vmNameToAvailabilitySetID[*vm.Name] = *vm.AvailabilitySet.ID
|
||||
}
|
||||
}
|
||||
availabilitySetIDs := sets.NewString()
|
||||
agentPoolAs = &[]string{}
|
||||
for nx := range nodes {
|
||||
nodeName := (*nodes[nx]).Name
|
||||
if isMasterNode(nodes[nx]) {
|
||||
continue
|
||||
}
|
||||
asID, ok := vmNameToAvailabilitySetID[nodeName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Node (%s) - has no availability sets", nodeName)
|
||||
}
|
||||
if availabilitySetIDs.Has(asID) {
|
||||
// already added in the list
|
||||
continue
|
||||
}
|
||||
asName, err := getLastSegment(asID)
|
||||
if err != nil {
|
||||
glog.Errorf("az.getNodeAvailabilitySet(%s), getLastSegment(%s), err=%v", nodeName, asID, err)
|
||||
return nil, err
|
||||
}
|
||||
// AvailabilitySet ID is currently upper cased in a indeterministic way
|
||||
// We want to keep it lower case, before the ID get fixed
|
||||
asName = strings.ToLower(asName)
|
||||
|
||||
*agentPoolAs = append(*agentPoolAs, asName)
|
||||
}
|
||||
|
||||
return agentPoolAs, nil
|
||||
}
|
||||
|
||||
func (az *Cloud) mapLoadBalancerNameToAvailabilitySet(lbName string, clusterName string) (availabilitySetName string) {
|
||||
availabilitySetName = strings.TrimSuffix(lbName, InternalLoadBalancerNameSuffix)
|
||||
if strings.EqualFold(clusterName, lbName) {
|
||||
availabilitySetName = az.Config.PrimaryAvailabilitySetName
|
||||
}
|
||||
|
||||
return availabilitySetName
|
||||
}
|
||||
|
||||
// For a load balancer, all frontend ip should reference either a subnet or publicIpAddress.
|
||||
// Thus Azure do not allow mixed type (public and internal) load balancer.
|
||||
// So we'd have a separate name for internal load balancer.
|
||||
// This would be the name for Azure LoadBalancer resource.
|
||||
func (az *Cloud) getLoadBalancerName(clusterName string, availabilitySetName string, isInternal bool) string {
|
||||
lbNamePrefix := availabilitySetName
|
||||
if strings.EqualFold(availabilitySetName, az.Config.PrimaryAvailabilitySetName) {
|
||||
lbNamePrefix = clusterName
|
||||
}
|
||||
if isInternal {
|
||||
return fmt.Sprintf("%s%s", lbNamePrefix, InternalLoadBalancerNameSuffix)
|
||||
}
|
||||
return lbNamePrefix
|
||||
}
|
||||
|
||||
// isMasterNode returns returns true is the node has a master role label.
|
||||
// The master role is determined by looking for:
|
||||
// * a kubernetes.io/role="master" label
|
||||
func isMasterNode(node *v1.Node) bool {
|
||||
for k, v := range node.Labels {
|
||||
if k == nodeLabelRole && v == "master" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// returns the deepest child's identifier from a full identifier string.
|
||||
func getLastSegment(ID string) (string, error) {
|
||||
parts := strings.Split(ID, "/")
|
||||
@ -179,16 +379,8 @@ func getPrimaryIPConfig(nic network.Interface) (*network.InterfaceIPConfiguratio
|
||||
return nil, fmt.Errorf("failed to determine the determine primary ipconfig. nicname=%q", *nic.Name)
|
||||
}
|
||||
|
||||
// For a load balancer, all frontend ip should reference either a subnet or publicIpAddress.
|
||||
// Thus Azure do not allow mixed type (public and internal) load balancer.
|
||||
// So we'd have a separate name for internal load balancer.
|
||||
// This would be the name for Azure LoadBalancer resource.
|
||||
func getLoadBalancerName(clusterName string, isInternal bool) string {
|
||||
if isInternal {
|
||||
return fmt.Sprintf("%s-internal", clusterName)
|
||||
}
|
||||
|
||||
return clusterName
|
||||
func isInternalLoadBalancer(lb *network.LoadBalancer) bool {
|
||||
return strings.HasSuffix(*lb.Name, InternalLoadBalancerNameSuffix)
|
||||
}
|
||||
|
||||
func getBackendPoolName(clusterName string) string {
|
||||
|
@ -40,6 +40,19 @@ func checkResourceExistsFromError(err error) (bool, error) {
|
||||
return false, v
|
||||
}
|
||||
|
||||
// If it is StatusNotFound return nil,
|
||||
// Otherwise, return what it is
|
||||
func ignoreStatusNotFoundFromError(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
v, ok := err.(autorest.DetailedError)
|
||||
if ok && v.StatusCode == http.StatusNotFound {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualMachine, exists bool, err error) {
|
||||
var realErr error
|
||||
|
||||
@ -103,7 +116,6 @@ func (az *Cloud) getSecurityGroup() (sg network.SecurityGroup, exists bool, err
|
||||
|
||||
func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exists bool, err error) {
|
||||
var realErr error
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("LoadBalancerClient.Get(%s): start", name)
|
||||
lb, err = az.LoadBalancerClient.Get(az.ResourceGroup, name, "")
|
||||
@ -121,6 +133,25 @@ func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exi
|
||||
return lb, exists, err
|
||||
}
|
||||
|
||||
func (az *Cloud) listLoadBalancers() (lbListResult network.LoadBalancerListResult, exists bool, err error) {
|
||||
var realErr error
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("LoadBalancerClient.List(%s): start", az.ResourceGroup)
|
||||
lbListResult, err = az.LoadBalancerClient.List(az.ResourceGroup)
|
||||
glog.V(10).Infof("LoadBalancerClient.List(%s): end", az.ResourceGroup)
|
||||
exists, realErr = checkResourceExistsFromError(err)
|
||||
if realErr != nil {
|
||||
return lbListResult, false, realErr
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return lbListResult, false, nil
|
||||
}
|
||||
|
||||
return lbListResult, exists, err
|
||||
}
|
||||
|
||||
func (az *Cloud) getPublicIPAddress(name string) (pip network.PublicIPAddress, exists bool, err error) {
|
||||
var realErr error
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user