mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-17 15:50:10 +00:00
Update EnsureLoadBalancer, EnsureLoadBalancerDeleted for azure.
This commit is contained in:
parent
7bf15f66fe
commit
4f44bf5e5a
@ -115,18 +115,15 @@ func (az *Cloud) getPublicIPName(clusterName string, service *v1.Service) (strin
|
|||||||
func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
|
func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
|
||||||
isInternal := isLoadBalancerInternal(service)
|
isInternal := isLoadBalancerInternal(service)
|
||||||
lbName := getLoadBalancerName(clusterName, isInternal)
|
lbName := getLoadBalancerName(clusterName, isInternal)
|
||||||
pipName, err := az.getPublicIPName(clusterName, service)
|
|
||||||
if err != nil {
|
// When a client updates the internal load balancer annotation,
|
||||||
return nil, false, err
|
// the service may be switched from an internal LB to a public one, or vise versa.
|
||||||
}
|
// Here we'll firstly ensure service do not lie in the opposite LB.
|
||||||
|
az.ensureLoadBalancerDeleted(clusterName, service, !isInternal)
|
||||||
|
|
||||||
serviceName := getServiceName(service)
|
serviceName := getServiceName(service)
|
||||||
glog.V(2).Infof("ensure(%s): START clusterName=%q lbName=%q", serviceName, clusterName, lbName)
|
glog.V(2).Infof("ensure(%s): START clusterName=%q lbName=%q", serviceName, clusterName, lbName)
|
||||||
|
|
||||||
pip, err := az.ensurePublicIPExists(serviceName, pipName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
sg, err := az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "")
|
sg, err := az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -159,7 +156,53 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lb, lbNeedsUpdate, err := az.reconcileLoadBalancer(lb, pip, clusterName, service, nodes)
|
var lbIP *string
|
||||||
|
var fipConfigurationProperties *network.FrontendIPConfigurationPropertiesFormat
|
||||||
|
|
||||||
|
if isInternal {
|
||||||
|
subnet, existsSubnet, err := az.getSubnet(az.VnetName, az.SubnetName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !existsSubnet {
|
||||||
|
return nil, fmt.Errorf("ensure(%s): lb(%s) - failed to get subnet: %s/%s", serviceName, lbName, az.VnetName, az.SubnetName)
|
||||||
|
}
|
||||||
|
|
||||||
|
configProperties := network.FrontendIPConfigurationPropertiesFormat{
|
||||||
|
Subnet: &network.Subnet{
|
||||||
|
ID: subnet.ID,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
loadBalancerIP := service.Spec.LoadBalancerIP
|
||||||
|
if loadBalancerIP != "" {
|
||||||
|
configProperties.PrivateIPAllocationMethod = network.Static
|
||||||
|
configProperties.PrivateIPAddress = &loadBalancerIP
|
||||||
|
lbIP = &loadBalancerIP
|
||||||
|
} else {
|
||||||
|
// We'll need to call GetLoadBalancer later to retrieve allocated IP.
|
||||||
|
configProperties.PrivateIPAllocationMethod = network.Dynamic
|
||||||
|
}
|
||||||
|
|
||||||
|
fipConfigurationProperties = &configProperties
|
||||||
|
} else {
|
||||||
|
pipName, err := az.getPublicIPName(clusterName, service)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pip, err := az.ensurePublicIPExists(serviceName, pipName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lbIP = pip.IPAddress
|
||||||
|
fipConfigurationProperties = &network.FrontendIPConfigurationPropertiesFormat{
|
||||||
|
PublicIPAddress: &network.PublicIPAddress{ID: pip.ID},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lb, lbNeedsUpdate, err := az.reconcileLoadBalancer(lb, fipConfigurationProperties, clusterName, service, nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -192,9 +235,21 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod
|
|||||||
return nil, utilerrors.Flatten(errs)
|
return nil, utilerrors.Flatten(errs)
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(2).Infof("ensure(%s): FINISH - %s", serviceName, *pip.IPAddress)
|
glog.V(2).Infof("ensure(%s): lb(%s) finished", serviceName, lbName)
|
||||||
|
|
||||||
|
if lbIP == nil {
|
||||||
|
lbStatus, exists, err := az.GetLoadBalancer(clusterName, service)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
return nil, fmt.Errorf("ensure(%s): lb(%s) - failed to get back load balancer", serviceName, lbName)
|
||||||
|
}
|
||||||
|
return lbStatus, nil
|
||||||
|
}
|
||||||
|
|
||||||
return &v1.LoadBalancerStatus{
|
return &v1.LoadBalancerStatus{
|
||||||
Ingress: []v1.LoadBalancerIngress{{IP: *pip.IPAddress}},
|
Ingress: []v1.LoadBalancerIngress{{IP: *lbIP}},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -215,12 +270,41 @@ func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servi
|
|||||||
lbName := getLoadBalancerName(clusterName, isInternal)
|
lbName := getLoadBalancerName(clusterName, isInternal)
|
||||||
serviceName := getServiceName(service)
|
serviceName := getServiceName(service)
|
||||||
|
|
||||||
pipName, err := az.getPublicIPName(clusterName, service)
|
glog.V(2).Infof("delete(%s): START clusterName=%q lbName=%q", serviceName, clusterName, lbName)
|
||||||
|
|
||||||
|
az.ensureLoadBalancerDeleted(clusterName, service, isInternal)
|
||||||
|
|
||||||
|
sg, existsSg, err := az.getSecurityGroup()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if existsSg {
|
||||||
|
reconciledSg, sgNeedsUpdate, reconcileErr := az.reconcileSecurityGroup(sg, clusterName, service)
|
||||||
|
if reconcileErr != nil {
|
||||||
|
return reconcileErr
|
||||||
|
}
|
||||||
|
if sgNeedsUpdate {
|
||||||
|
glog.V(3).Infof("delete(%s): sg(%s) - updating", serviceName, az.SecurityGroupName)
|
||||||
|
// azure-sdk-for-go introduced contraint validation which breaks the updating here if we don't set these
|
||||||
|
// to nil. This is a workaround until https://github.com/Azure/go-autorest/issues/112 is fixed
|
||||||
|
sg.SecurityGroupPropertiesFormat.NetworkInterfaces = nil
|
||||||
|
sg.SecurityGroupPropertiesFormat.Subnets = nil
|
||||||
|
_, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *reconciledSg.Name, reconciledSg, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
glog.V(2).Infof("delete(%s): START clusterName=%q lbName=%q", serviceName, clusterName, lbName)
|
glog.V(2).Infof("delete(%s): FINISH", serviceName)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (az *Cloud) ensureLoadBalancerDeleted(clusterName string, service *v1.Service, isInternalLb bool) error {
|
||||||
|
lbName := getLoadBalancerName(clusterName, isInternalLb)
|
||||||
|
serviceName := getServiceName(service)
|
||||||
|
|
||||||
|
glog.V(10).Infof("ensure lb deleted: clusterName=%q, serviceName=%s, lbName=%q", clusterName, serviceName, lbName)
|
||||||
|
|
||||||
// reconcile logic is capable of fully reconcile, so we can use this to delete
|
// reconcile logic is capable of fully reconcile, so we can use this to delete
|
||||||
service.Spec.Ports = []v1.ServicePort{}
|
service.Spec.Ports = []v1.ServicePort{}
|
||||||
@ -252,36 +336,18 @@ func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sg, existsSg, err := az.getSecurityGroup()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if existsSg {
|
|
||||||
reconciledSg, sgNeedsUpdate, reconcileErr := az.reconcileSecurityGroup(sg, clusterName, service)
|
|
||||||
if reconcileErr != nil {
|
|
||||||
return reconcileErr
|
|
||||||
}
|
|
||||||
if sgNeedsUpdate {
|
|
||||||
glog.V(3).Infof("delete(%s): sg(%s) - updating", serviceName, az.SecurityGroupName)
|
|
||||||
// azure-sdk-for-go introduced contraint validation which breaks the updating here if we don't set these
|
|
||||||
// to nil. This is a workaround until https://github.com/Azure/go-autorest/issues/112 is fixed
|
|
||||||
sg.SecurityGroupPropertiesFormat.NetworkInterfaces = nil
|
|
||||||
sg.SecurityGroupPropertiesFormat.Subnets = nil
|
|
||||||
_, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *reconciledSg.Name, reconciledSg, nil)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Only delete an IP address if we created it.
|
// Only delete an IP address if we created it.
|
||||||
if service.Spec.LoadBalancerIP == "" {
|
if !isInternalLb && service.Spec.LoadBalancerIP == "" {
|
||||||
|
pipName, err := az.getPublicIPName(clusterName, service)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
err = az.ensurePublicIPDeleted(serviceName, pipName)
|
err = az.ensurePublicIPDeleted(serviceName, pipName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(2).Infof("delete(%s): FINISH", serviceName)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -328,7 +394,7 @@ func (az *Cloud) ensurePublicIPDeleted(serviceName, pipName string) error {
|
|||||||
// This ensures load balancer exists and the frontend ip config is setup.
|
// This ensures load balancer exists and the frontend ip config is setup.
|
||||||
// This also reconciles the Service's Ports with the LoadBalancer config.
|
// This also reconciles the Service's Ports with the LoadBalancer config.
|
||||||
// This entails adding rules/probes for expected Ports and removing stale rules/ports.
|
// This entails adding rules/probes for expected Ports and removing stale rules/ports.
|
||||||
func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, pip *network.PublicIPAddress, clusterName string, service *v1.Service, nodes []*v1.Node) (network.LoadBalancer, bool, error) {
|
func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfigurationProperties *network.FrontendIPConfigurationPropertiesFormat, clusterName string, service *v1.Service, nodes []*v1.Node) (network.LoadBalancer, bool, error) {
|
||||||
isInternal := isLoadBalancerInternal(service)
|
isInternal := isLoadBalancerInternal(service)
|
||||||
lbName := getLoadBalancerName(clusterName, isInternal)
|
lbName := getLoadBalancerName(clusterName, isInternal)
|
||||||
serviceName := getServiceName(service)
|
serviceName := getServiceName(service)
|
||||||
@ -395,11 +461,7 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, pip *network.Pub
|
|||||||
newConfigs = append(newConfigs,
|
newConfigs = append(newConfigs,
|
||||||
network.FrontendIPConfiguration{
|
network.FrontendIPConfiguration{
|
||||||
Name: to.StringPtr(lbFrontendIPConfigName),
|
Name: to.StringPtr(lbFrontendIPConfigName),
|
||||||
FrontendIPConfigurationPropertiesFormat: &network.FrontendIPConfigurationPropertiesFormat{
|
FrontendIPConfigurationPropertiesFormat: fipConfigurationProperties,
|
||||||
PublicIPAddress: &network.PublicIPAddress{
|
|
||||||
ID: pip.ID,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
glog.V(10).Infof("reconcile(%s)(%t): lb frontendconfig(%s) - adding", serviceName, wantLb, lbFrontendIPConfigName)
|
glog.V(10).Infof("reconcile(%s)(%t): lb frontendconfig(%s) - adding", serviceName, wantLb, lbFrontendIPConfigName)
|
||||||
dirtyConfigs = true
|
dirtyConfigs = true
|
||||||
|
@ -36,11 +36,11 @@ var testClusterName = "testCluster"
|
|||||||
func TestReconcileLoadBalancerAddPort(t *testing.T) {
|
func TestReconcileLoadBalancerAddPort(t *testing.T) {
|
||||||
az := getTestCloud()
|
az := getTestCloud()
|
||||||
svc := getTestService("servicea", 80)
|
svc := getTestService("servicea", 80)
|
||||||
pip := getTestPublicIP()
|
configProperties := getTestPublicFipConfigurationProperties()
|
||||||
lb := getTestLoadBalancer()
|
lb := getTestLoadBalancer()
|
||||||
nodes := []*v1.Node{}
|
nodes := []*v1.Node{}
|
||||||
|
|
||||||
lb, updated, err := az.reconcileLoadBalancer(lb, &pip, testClusterName, &svc, nodes)
|
lb, updated, err := az.reconcileLoadBalancer(lb, &configProperties, testClusterName, &svc, nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %q", err)
|
t.Errorf("Unexpected error: %q", err)
|
||||||
}
|
}
|
||||||
@ -64,12 +64,12 @@ func TestReconcileLoadBalancerNodeHealth(t *testing.T) {
|
|||||||
serviceapi.BetaAnnotationExternalTraffic: serviceapi.AnnotationValueExternalTrafficLocal,
|
serviceapi.BetaAnnotationExternalTraffic: serviceapi.AnnotationValueExternalTrafficLocal,
|
||||||
serviceapi.BetaAnnotationHealthCheckNodePort: "32456",
|
serviceapi.BetaAnnotationHealthCheckNodePort: "32456",
|
||||||
}
|
}
|
||||||
pip := getTestPublicIP()
|
configProperties := getTestPublicFipConfigurationProperties()
|
||||||
lb := getTestLoadBalancer()
|
lb := getTestLoadBalancer()
|
||||||
|
|
||||||
nodes := []*v1.Node{}
|
nodes := []*v1.Node{}
|
||||||
|
|
||||||
lb, updated, err := az.reconcileLoadBalancer(lb, &pip, testClusterName, &svc, nodes)
|
lb, updated, err := az.reconcileLoadBalancer(lb, &configProperties, testClusterName, &svc, nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %q", err)
|
t.Errorf("Unexpected error: %q", err)
|
||||||
}
|
}
|
||||||
@ -91,10 +91,10 @@ func TestReconcileLoadBalancerRemoveAllPortsRemovesFrontendConfig(t *testing.T)
|
|||||||
az := getTestCloud()
|
az := getTestCloud()
|
||||||
svc := getTestService("servicea", 80)
|
svc := getTestService("servicea", 80)
|
||||||
lb := getTestLoadBalancer()
|
lb := getTestLoadBalancer()
|
||||||
pip := getTestPublicIP()
|
configProperties := getTestPublicFipConfigurationProperties()
|
||||||
nodes := []*v1.Node{}
|
nodes := []*v1.Node{}
|
||||||
|
|
||||||
lb, updated, err := az.reconcileLoadBalancer(lb, &pip, testClusterName, &svc, nodes)
|
lb, updated, err := az.reconcileLoadBalancer(lb, &configProperties, testClusterName, &svc, nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %q", err)
|
t.Errorf("Unexpected error: %q", err)
|
||||||
}
|
}
|
||||||
@ -121,13 +121,13 @@ func TestReconcileLoadBalancerRemoveAllPortsRemovesFrontendConfig(t *testing.T)
|
|||||||
func TestReconcileLoadBalancerRemovesPort(t *testing.T) {
|
func TestReconcileLoadBalancerRemovesPort(t *testing.T) {
|
||||||
az := getTestCloud()
|
az := getTestCloud()
|
||||||
svc := getTestService("servicea", 80, 443)
|
svc := getTestService("servicea", 80, 443)
|
||||||
pip := getTestPublicIP()
|
configProperties := getTestPublicFipConfigurationProperties()
|
||||||
nodes := []*v1.Node{}
|
nodes := []*v1.Node{}
|
||||||
|
|
||||||
existingLoadBalancer := getTestLoadBalancer(svc)
|
existingLoadBalancer := getTestLoadBalancer(svc)
|
||||||
|
|
||||||
svcUpdated := getTestService("servicea", 80)
|
svcUpdated := getTestService("servicea", 80)
|
||||||
updatedLoadBalancer, _, err := az.reconcileLoadBalancer(existingLoadBalancer, &pip, testClusterName, &svcUpdated, nodes)
|
updatedLoadBalancer, _, err := az.reconcileLoadBalancer(existingLoadBalancer, &configProperties, testClusterName, &svcUpdated, nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %q", err)
|
t.Errorf("Unexpected error: %q", err)
|
||||||
}
|
}
|
||||||
@ -140,17 +140,17 @@ func TestReconcileLoadBalancerMultipleServices(t *testing.T) {
|
|||||||
az := getTestCloud()
|
az := getTestCloud()
|
||||||
svc1 := getTestService("servicea", 80, 443)
|
svc1 := getTestService("servicea", 80, 443)
|
||||||
svc2 := getTestService("serviceb", 80)
|
svc2 := getTestService("serviceb", 80)
|
||||||
pip := getTestPublicIP()
|
configProperties := getTestPublicFipConfigurationProperties()
|
||||||
nodes := []*v1.Node{}
|
nodes := []*v1.Node{}
|
||||||
|
|
||||||
existingLoadBalancer := getTestLoadBalancer()
|
existingLoadBalancer := getTestLoadBalancer()
|
||||||
|
|
||||||
updatedLoadBalancer, _, err := az.reconcileLoadBalancer(existingLoadBalancer, &pip, testClusterName, &svc1, nodes)
|
updatedLoadBalancer, _, err := az.reconcileLoadBalancer(existingLoadBalancer, &configProperties, testClusterName, &svc1, nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %q", err)
|
t.Errorf("Unexpected error: %q", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
updatedLoadBalancer, _, err = az.reconcileLoadBalancer(updatedLoadBalancer, &pip, testClusterName, &svc2, nodes)
|
updatedLoadBalancer, _, err = az.reconcileLoadBalancer(updatedLoadBalancer, &configProperties, testClusterName, &svc2, nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %q", err)
|
t.Errorf("Unexpected error: %q", err)
|
||||||
}
|
}
|
||||||
@ -223,10 +223,10 @@ func getBackendPort(port int32) int32 {
|
|||||||
return port + 10000
|
return port + 10000
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTestPublicIP() network.PublicIPAddress {
|
func getTestPublicFipConfigurationProperties() network.FrontendIPConfigurationPropertiesFormat {
|
||||||
pip := network.PublicIPAddress{}
|
return network.FrontendIPConfigurationPropertiesFormat{
|
||||||
pip.ID = to.StringPtr("/this/is/a/public/ip/address/id")
|
PublicIPAddress: &network.PublicIPAddress{ID: to.StringPtr("/this/is/a/public/ip/address/id")},
|
||||||
return pip
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTestService(identifier string, requestedPorts ...int32) v1.Service {
|
func getTestService(identifier string, requestedPorts ...int32) v1.Service {
|
||||||
|
@ -124,3 +124,20 @@ func (az *Cloud) getPublicIPAddress(name string) (pip network.PublicIPAddress, e
|
|||||||
|
|
||||||
return pip, exists, err
|
return pip, exists, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (subnet network.Subnet, exists bool, err error) {
|
||||||
|
var realErr error
|
||||||
|
|
||||||
|
subnet, err = az.SubnetsClient.Get(az.ResourceGroup, virtualNetworkName, subnetName, "")
|
||||||
|
|
||||||
|
exists, realErr = checkResourceExistsFromError(err)
|
||||||
|
if realErr != nil {
|
||||||
|
return subnet, false, realErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return subnet, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return subnet, exists, err
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user