mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Attach a new finalizer in GCE ILB creation.
Add logic in service_controller to skip create/update if finalizer from a different controller is found. The newly added finalizer will be checked by other controllers implementing ILB services to determine if a given service is already being managed by service_controller. Moved finalizer check into cloudprovider code. added unit test to verify new finalizer. Modified existing unit test to create a fake service so that attach/remove finalizer step can be tested.
This commit is contained in:
parent
bbc035a66c
commit
1de2327afc
@ -33,6 +33,13 @@ rules:
|
||||
- create
|
||||
- patch
|
||||
- update
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
- services/status
|
||||
verbs:
|
||||
- patch
|
||||
- update
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: Role
|
||||
|
@ -804,6 +804,7 @@ func (s *Controller) addFinalizer(service *v1.Service) error {
|
||||
updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer)
|
||||
|
||||
klog.V(2).Infof("Adding finalizer to service %s/%s", updated.Namespace, updated.Name)
|
||||
// TODO(87447) use PatchService from k8s.io/cloud-provider/service/helpers
|
||||
_, err := patch(s.kubeClient.CoreV1(), service, updated)
|
||||
return err
|
||||
}
|
||||
|
@ -50,11 +50,13 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/pkg/version:go_default_library",
|
||||
|
@ -41,12 +41,30 @@ import (
|
||||
const (
|
||||
// Used to list instances in all states(RUNNING and other) - https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroups/listInstances
|
||||
allInstances = "ALL"
|
||||
// ILBFinalizerV1 key is used to identify ILB services whose resources are managed by service controller.
|
||||
ILBFinalizerV1 = "gke.networking.io/l4-ilb-v1"
|
||||
// ILBFinalizerV2 is the finalizer used by newer controllers that implement Internal LoadBalancer services.
|
||||
ILBFinalizerV2 = "gke.networking.io/l4-ilb-v2"
|
||||
)
|
||||
|
||||
func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v1.Service, existingFwdRule *compute.ForwardingRule, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
|
||||
if g.AlphaFeatureGate.Enabled(AlphaFeatureILBSubsets) {
|
||||
if g.AlphaFeatureGate.Enabled(AlphaFeatureILBSubsets) && existingFwdRule == nil {
|
||||
// When ILBSubsets is enabled, new ILB services will not be processed here.
|
||||
// Services that have existing GCE resources created by this controller will continue to update.
|
||||
g.eventRecorder.Eventf(svc, v1.EventTypeNormal, "SkippingEnsureInternalLoadBalancer",
|
||||
"Skipped ensureInternalLoadBalancer since %s feature is enabled.", AlphaFeatureILBSubsets)
|
||||
return nil, cloudprovider.ImplementedElsewhere
|
||||
}
|
||||
if hasFinalizer(svc, ILBFinalizerV2) {
|
||||
// Another controller is handling the resources for this service.
|
||||
g.eventRecorder.Eventf(svc, v1.EventTypeNormal, "SkippingEnsureInternalLoadBalancer",
|
||||
"Skipped ensureInternalLoadBalancer as service contains '%s' finalizer.", ILBFinalizerV2)
|
||||
return nil, cloudprovider.ImplementedElsewhere
|
||||
}
|
||||
if err := addFinalizer(svc, g.client.CoreV1(), ILBFinalizerV1); err != nil {
|
||||
klog.Errorf("Failed to attach finalizer '%s' on service %s/%s - %v", ILBFinalizerV1, svc.Namespace, svc.Name, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nm := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
|
||||
ports, _, protocol := getPortsAndProtocol(svc.Spec.Ports)
|
||||
@ -298,6 +316,11 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string,
|
||||
return err
|
||||
}
|
||||
|
||||
if err := removeFinalizer(svc, g.client.CoreV1(), ILBFinalizerV1); err != nil {
|
||||
klog.Errorf("Failed to remove finalizer '%s' on service %s/%s - %v", ILBFinalizerV1, svc.Namespace, svc.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -32,11 +32,12 @@ import (
|
||||
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
|
||||
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock"
|
||||
computebeta "google.golang.org/api/compute/v0.beta"
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
"google.golang.org/api/compute/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/tools/record"
|
||||
cloudprovider "k8s.io/cloud-provider"
|
||||
"k8s.io/cloud-provider"
|
||||
servicehelper "k8s.io/cloud-provider/service/helpers"
|
||||
)
|
||||
|
||||
@ -155,8 +156,9 @@ func TestEnsureInternalLoadBalancer(t *testing.T) {
|
||||
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
@ -180,6 +182,10 @@ func TestEnsureInternalLoadBalancerDeprecatedAnnotation(t *testing.T) {
|
||||
}
|
||||
|
||||
svc := fakeLoadBalancerServiceDeprecatedAnnotation(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to create service %s, err %v", svc.Name, err)
|
||||
}
|
||||
status, err := gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, svc, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
@ -214,7 +220,8 @@ func TestEnsureInternalLoadBalancerWithExistingResources(t *testing.T) {
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
// Create the expected resources necessary for an Internal Load Balancer
|
||||
nm := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
@ -250,6 +257,8 @@ func TestEnsureInternalLoadBalancerClearPreviousResources(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
|
||||
// Create a ForwardingRule that's missing an IP address
|
||||
@ -327,6 +336,8 @@ func TestEnsureInternalLoadBalancerHealthCheckConfigurable(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
|
||||
sharedHealthCheck := !servicehelper.RequestsOnlyLocalTraffic(svc)
|
||||
@ -357,6 +368,8 @@ func TestUpdateInternalLoadBalancerBackendServices(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
_, err = createInternalLoadBalancer(gce, svc, nil, []string{"test-node-1"}, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -394,7 +407,7 @@ func TestUpdateInternalLoadBalancerBackendServices(t *testing.T) {
|
||||
bs.SelfLink,
|
||||
fmt.Sprintf("%s/regions/%s/backendServices/%s", urlBase, vals.Region, bs.Name),
|
||||
)
|
||||
assert.Equal(t, bs.Description, `{"kubernetes.io/service-name":"/"}`)
|
||||
assert.Equal(t, bs.Description, `{"kubernetes.io/service-name":"/`+svc.Name+`"}`)
|
||||
assert.Equal(
|
||||
t,
|
||||
bs.HealthChecks,
|
||||
@ -411,6 +424,8 @@ func TestUpdateInternalLoadBalancerNodes(t *testing.T) {
|
||||
node1Name := []string{"test-node-1"}
|
||||
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
nodes, err := createAndInsertNodes(gce, node1Name, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -476,6 +491,8 @@ func TestEnsureInternalLoadBalancerDeleted(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
_, err = createInternalLoadBalancer(gce, svc, nil, []string{"test-node-1"}, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -492,6 +509,8 @@ func TestEnsureInternalLoadBalancerDeletedTwiceDoesNotError(t *testing.T) {
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = createInternalLoadBalancer(gce, svc, nil, []string{"test-node-1"}, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
assert.NoError(t, err)
|
||||
@ -516,7 +535,8 @@ func TestEnsureInternalLoadBalancerWithSpecialHealthCheck(t *testing.T) {
|
||||
svc.Spec.HealthCheckNodePort = healthCheckNodePort
|
||||
svc.Spec.Type = v1.ServiceTypeLoadBalancer
|
||||
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
|
||||
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, []string{nodeName}, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
@ -533,6 +553,9 @@ func TestClearPreviousInternalResources(t *testing.T) {
|
||||
vals := DefaultTestClusterValues()
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
loadBalancerName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
nm := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
|
||||
c := gce.c.(*cloud.MockGCE)
|
||||
@ -596,6 +619,8 @@ func TestEnsureInternalFirewallDeletesLegacyFirewall(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
vals := DefaultTestClusterValues()
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
fwName := MakeFirewallName(lbName)
|
||||
|
||||
@ -670,6 +695,8 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
vals := DefaultTestClusterValues()
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
fwName := MakeFirewallName(lbName)
|
||||
|
||||
@ -744,7 +771,10 @@ func TestEnsureLoadBalancerDeletedSucceedsOnXPN(t *testing.T) {
|
||||
gce.eventRecorder = recorder
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = createInternalLoadBalancer(gce, fakeLoadbalancerService(string(LBTypeInternal)), nil, []string{"test-node-1"}, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
_, err = createInternalLoadBalancer(gce, svc, nil, []string{"test-node-1"}, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
c.MockFirewalls.DeleteHook = mock.DeleteFirewallsUnauthorizedErrHook
|
||||
@ -766,6 +796,8 @@ func TestEnsureInternalInstanceGroupsDeleted(t *testing.T) {
|
||||
igName := makeInstanceGroupName(vals.ClusterID)
|
||||
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
_, err = createInternalLoadBalancer(gce, svc, nil, []string{"test-node-1"}, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -884,6 +916,8 @@ func TestEnsureInternalLoadBalancerErrors(t *testing.T) {
|
||||
if tc.injectMock != nil {
|
||||
tc.injectMock(gce.c.(*cloud.MockGCE))
|
||||
}
|
||||
_, err = gce.client.CoreV1().Services(params.service.Namespace).Create(params.service)
|
||||
require.NoError(t, err)
|
||||
status, err := gce.ensureInternalLoadBalancer(
|
||||
params.clusterName,
|
||||
params.clusterID,
|
||||
@ -987,14 +1021,21 @@ func TestEnsureInternalLoadBalancerSubsetting(t *testing.T) {
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
gce.AlphaFeatureGate = NewAlphaFeatureGate([]string{AlphaFeatureILBSubsets})
|
||||
recorder := record.NewFakeRecorder(1024)
|
||||
gce.eventRecorder = recorder
|
||||
|
||||
nodeNames := []string{"test-node-1"}
|
||||
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
assert.EqualError(t, err, cloudprovider.ImplementedElsewhere.Error())
|
||||
expectedEvent := fmt.Sprintf("Normal SkippingEnsureInternalLoadBalancer Skipped ensureInternalLoadBalancer"+
|
||||
" since %s feature is enabled.", AlphaFeatureILBSubsets)
|
||||
checkEvent(t, recorder, expectedEvent, true)
|
||||
// No loadbalancer resources will be created due to the ILB Feature Gate
|
||||
assert.Empty(t, status)
|
||||
assertInternalLbResourcesDeleted(t, gce, svc, vals, true)
|
||||
@ -1010,6 +1051,8 @@ func TestEnsureInternalLoadBalancerSubsetting(t *testing.T) {
|
||||
assertInternalLbResources(t, gce, svc, vals, nodeNames)
|
||||
}
|
||||
|
||||
// TestEnsureInternalLoadBalancerDeletedSubsetting verifies that updates and deletion of existing ILB resources
|
||||
// continue to work, even if ILBSubsets feature is enabled.
|
||||
func TestEnsureInternalLoadBalancerDeletedSubsetting(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@ -1021,20 +1064,20 @@ func TestEnsureInternalLoadBalancerDeletedSubsetting(t *testing.T) {
|
||||
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
// Enable FeatureGatee
|
||||
// Enable FeatureGate
|
||||
gce.AlphaFeatureGate = NewAlphaFeatureGate([]string{AlphaFeatureILBSubsets})
|
||||
newLBStatus := v1.LoadBalancerStatus{Ingress: []v1.LoadBalancerIngress{{IP: "1.2.3.4"}}}
|
||||
// mock scenario where a different controller modifies status.
|
||||
svc.Status.LoadBalancer = newLBStatus
|
||||
// mock scenario where user updates the service to use a different IP, this should be processed here.
|
||||
svc.Spec.LoadBalancerIP = "1.2.3.4"
|
||||
status, err = gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, svc, nodes)
|
||||
assert.EqualError(t, err, cloudprovider.ImplementedElsewhere.Error())
|
||||
// ensure that the status is empty
|
||||
assert.Empty(t, status)
|
||||
assert.Equal(t, svc.Status.LoadBalancer, newLBStatus)
|
||||
assert.NoError(t, err)
|
||||
// ensure that the status has the new IP
|
||||
assert.Equal(t, status.Ingress[0].IP, "1.2.3.4")
|
||||
// Invoked when service is deleted.
|
||||
err = gce.EnsureLoadBalancerDeleted(context.Background(), vals.ClusterName, svc)
|
||||
assert.NoError(t, err)
|
||||
@ -1052,6 +1095,8 @@ func TestEnsureInternalLoadBalancerGlobalAccess(t *testing.T) {
|
||||
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
|
||||
@ -1115,6 +1160,8 @@ func TestEnsureInternalLoadBalancerDisableGlobalAccess(t *testing.T) {
|
||||
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
svc.Annotations[ServiceAnnotationILBAllowGlobalAccess] = "true"
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
@ -1164,6 +1211,8 @@ func TestGlobalAccessChangeScheme(t *testing.T) {
|
||||
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
if err != nil {
|
||||
@ -1323,6 +1372,8 @@ func TestEnsureInternalLoadBalancerCustomSubnet(t *testing.T) {
|
||||
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
lbName := gce.GetLoadBalancerName(context.TODO(), "", svc)
|
||||
|
||||
@ -1464,3 +1515,65 @@ func TestEnsureInternalFirewallPortRanges(t *testing.T) {
|
||||
t.Errorf("Expected firewall rule with ports %v,got %v", tc.Result, existingPorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnsureInternalLoadBalancerFinalizer(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
vals := DefaultTestClusterValues()
|
||||
nodeNames := []string{"test-node-1"}
|
||||
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
assertInternalLbResources(t, gce, svc, vals, nodeNames)
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Get(svc.Name, metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
if !hasFinalizer(svc, ILBFinalizerV1) {
|
||||
t.Errorf("Expected finalizer '%s' not found in Finalizer list - %v", ILBFinalizerV1, svc.Finalizers)
|
||||
}
|
||||
|
||||
// Delete the service
|
||||
err = gce.EnsureLoadBalancerDeleted(context.Background(), vals.ClusterName, svc)
|
||||
require.NoError(t, err)
|
||||
assertInternalLbResourcesDeleted(t, gce, svc, vals, true)
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Get(svc.Name, metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
if hasFinalizer(svc, ILBFinalizerV1) {
|
||||
t.Errorf("Finalizer '%s' not deleted as part of ILB delete", ILBFinalizerV1)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnsureInternalLoadBalancerSkipped checks that the EnsureInternalLoadBalancer function skips creation of
|
||||
// resources when the input service has a V2 finalizer.
|
||||
func TestEnsureLoadBalancerSkipped(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
vals := DefaultTestClusterValues()
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
recorder := record.NewFakeRecorder(1024)
|
||||
gce.eventRecorder = recorder
|
||||
|
||||
nodeNames := []string{"test-node-1"}
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
// Add the V2 finalizer
|
||||
svc.Finalizers = append(svc.Finalizers, ILBFinalizerV2)
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
require.NoError(t, err)
|
||||
status, err := createInternalLoadBalancer(gce, svc, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
assert.EqualError(t, err, cloudprovider.ImplementedElsewhere.Error())
|
||||
expectedEvent := fmt.Sprintf("Normal SkippingEnsureInternalLoadBalancer Skipped ensureInternalLoadBalancer"+
|
||||
" as service contains '%s' finalizer",
|
||||
ILBFinalizerV2)
|
||||
checkEvent(t, recorder, expectedEvent, true)
|
||||
// No loadbalancer resources will be created due to the ILB Feature Gate
|
||||
assert.Empty(t, status)
|
||||
assertInternalLbResourcesDeleted(t, gce, svc, vals, true)
|
||||
|
||||
}
|
||||
|
@ -83,6 +83,8 @@ func TestEnsureLoadBalancerCreatesInternalLb(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
apiService := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Create(apiService)
|
||||
require.NoError(t, err)
|
||||
status, err := gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, apiService, nodes)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
@ -126,6 +128,8 @@ func TestEnsureLoadBalancerDeletesExistingExternalLb(t *testing.T) {
|
||||
createExternalLoadBalancer(gce, apiService, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
|
||||
apiService = fakeLoadbalancerService(string(LBTypeInternal))
|
||||
apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Create(apiService)
|
||||
require.NoError(t, err)
|
||||
status, err := gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, apiService, nodes)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, status.Ingress)
|
||||
@ -165,6 +169,8 @@ func TestEnsureLoadBalancerDeletedDeletesInternalLb(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
apiService := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Create(apiService)
|
||||
require.NoError(t, err)
|
||||
createInternalLoadBalancer(gce, apiService, nil, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
|
||||
err = gce.EnsureLoadBalancerDeleted(context.Background(), vals.ClusterName, apiService)
|
||||
|
@ -46,6 +46,7 @@ const (
|
||||
errPrefixGetTargetPool = "error getting load balancer's target pool:"
|
||||
wrongTier = "SupremeLuxury"
|
||||
errStrUnsupportedTier = "unsupported network tier: \"" + wrongTier + "\""
|
||||
fakeSvcName = "fakesvc"
|
||||
)
|
||||
|
||||
func fakeLoadbalancerService(lbType string) *v1.Service {
|
||||
@ -59,7 +60,7 @@ func fakeLoadBalancerServiceDeprecatedAnnotation(lbType string) *v1.Service {
|
||||
func fakeLoadbalancerServiceHelper(lbType string, annotationKey string) *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "",
|
||||
Name: fakeSvcName,
|
||||
Annotations: map[string]string{annotationKey: lbType},
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
|
@ -34,10 +34,15 @@ import (
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
|
||||
"encoding/json"
|
||||
|
||||
"cloud.google.com/go/compute/metadata"
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
"google.golang.org/api/googleapi"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
)
|
||||
|
||||
func fakeGCECloud(vals TestClusterValues) (*Cloud, error) {
|
||||
@ -45,6 +50,7 @@ func fakeGCECloud(vals TestClusterValues) (*Cloud, error) {
|
||||
|
||||
gce.AlphaFeatureGate = NewAlphaFeatureGate([]string{})
|
||||
gce.nodeInformerSynced = func() bool { return true }
|
||||
gce.client = fake.NewSimpleClientset()
|
||||
|
||||
mockGCE := gce.c.(*cloud.MockGCE)
|
||||
mockGCE.MockTargetPools.AddInstanceHook = mock.AddInstanceHook
|
||||
@ -316,3 +322,85 @@ func typeOfNetwork(network *compute.Network) netType {
|
||||
func getLocationName(project, zoneOrRegion string) string {
|
||||
return fmt.Sprintf("projects/%s/locations/%s", project, zoneOrRegion)
|
||||
}
|
||||
|
||||
func addFinalizer(service *v1.Service, kubeClient v1core.CoreV1Interface, key string) error {
|
||||
if hasFinalizer(service, key) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Make a copy so we don't mutate the shared informer cache.
|
||||
updated := service.DeepCopy()
|
||||
updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, key)
|
||||
|
||||
// TODO(87447) use PatchService from k8s.io/cloud-provider/service/helpers
|
||||
_, err := patchService(kubeClient, service, updated)
|
||||
return err
|
||||
}
|
||||
|
||||
// removeFinalizer patches the service to remove finalizer.
|
||||
func removeFinalizer(service *v1.Service, kubeClient v1core.CoreV1Interface, key string) error {
|
||||
if !hasFinalizer(service, key) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Make a copy so we don't mutate the shared informer cache.
|
||||
updated := service.DeepCopy()
|
||||
updated.ObjectMeta.Finalizers = removeString(updated.ObjectMeta.Finalizers, key)
|
||||
|
||||
_, err := patchService(kubeClient, service, updated)
|
||||
return err
|
||||
}
|
||||
|
||||
//hasFinalizer returns if the given service has the specified key in its list of finalizers.
|
||||
func hasFinalizer(service *v1.Service, key string) bool {
|
||||
for _, finalizer := range service.ObjectMeta.Finalizers {
|
||||
if finalizer == key {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// removeString returns a newly created []string that contains all items from slice that
|
||||
// are not equal to s.
|
||||
func removeString(slice []string, s string) []string {
|
||||
var newSlice []string
|
||||
for _, item := range slice {
|
||||
if item != s {
|
||||
newSlice = append(newSlice, item)
|
||||
}
|
||||
}
|
||||
return newSlice
|
||||
}
|
||||
|
||||
// patchService patches service's Status or ObjectMeta given the origin and
|
||||
// updated ones. Change to spec will be ignored.
|
||||
func patchService(c v1core.CoreV1Interface, oldSvc *v1.Service, newSvc *v1.Service) (*v1.Service, error) {
|
||||
// Reset spec to make sure only patch for Status or ObjectMeta.
|
||||
newSvc.Spec = oldSvc.Spec
|
||||
|
||||
patchBytes, err := getPatchBytes(oldSvc, newSvc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c.Services(oldSvc.Namespace).Patch(oldSvc.Name, types.StrategicMergePatchType, patchBytes, "status")
|
||||
}
|
||||
|
||||
func getPatchBytes(oldSvc *v1.Service, newSvc *v1.Service) ([]byte, error) {
|
||||
oldData, err := json.Marshal(oldSvc)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to Marshal oldData for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err)
|
||||
}
|
||||
|
||||
newData, err := json.Marshal(newSvc)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to Marshal newData for svc %s/%s: %v", newSvc.Namespace, newSvc.Name, err)
|
||||
}
|
||||
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Service{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err)
|
||||
}
|
||||
return patchBytes, nil
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"testing"
|
||||
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestLastIPInRange(t *testing.T) {
|
||||
@ -118,3 +119,39 @@ func TestFirewallToGcloudArgs(t *testing.T) {
|
||||
t.Errorf("%q does not equal %q", got, e)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAddRemoveFinalizer tests the add/remove and hasFinalizer methods.
|
||||
func TestAddRemoveFinalizer(t *testing.T) {
|
||||
svc := fakeLoadbalancerService(string(LBTypeInternal))
|
||||
gce, err := fakeGCECloud(vals)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get GCE client, err %v", err)
|
||||
}
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(svc)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to create service %s, err %v", svc.Name, err)
|
||||
}
|
||||
|
||||
err = addFinalizer(svc, gce.client.CoreV1(), ILBFinalizerV1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add finalizer, err %v", err)
|
||||
}
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Get(svc.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get service, err %v", err)
|
||||
}
|
||||
if !hasFinalizer(svc, ILBFinalizerV1) {
|
||||
t.Errorf("Unable to find finalizer '%s' in service %s", ILBFinalizerV1, svc.Name)
|
||||
}
|
||||
err = removeFinalizer(svc, gce.client.CoreV1(), ILBFinalizerV1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to remove finalizer, err %v", err)
|
||||
}
|
||||
svc, err = gce.client.CoreV1().Services(svc.Namespace).Get(svc.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get service, err %v", err)
|
||||
}
|
||||
if hasFinalizer(svc, ILBFinalizerV1) {
|
||||
t.Errorf("Failed to remove finalizer '%s' in service %s", ILBFinalizerV1, svc.Name)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user