From 84903d4b842b05801db389807d0effeb65fa119d Mon Sep 17 00:00:00 2001 From: Sugang Li Date: Thu, 5 May 2022 20:42:27 +0000 Subject: [PATCH] add ILB and NetLB pinhole changes --- .../gce/gce_firewall.go | 9 +++++ .../gce/gce_loadbalancer_external.go | 27 ++++++++------- .../gce/gce_loadbalancer_external_test.go | 22 +++++++++++-- .../gce/gce_loadbalancer_internal.go | 33 +++++++++++-------- .../gce/gce_loadbalancer_internal_test.go | 24 ++++++++++---- .../gce/gce_loadbalancer_utils_test.go | 2 +- 6 files changed, 81 insertions(+), 36 deletions(-) diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_firewall.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_firewall.go index 13e982da09c..afb59fe6c19 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_firewall.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_firewall.go @@ -66,3 +66,12 @@ func (g *Cloud) UpdateFirewall(f *compute.Firewall) error { mc := newFirewallMetricContext("update") return mc.Observe(g.c.Firewalls().Update(ctx, meta.GlobalKey(f.Name), f)) } + +// PatchFirewall applies the given firewall as an update to an existing service. +func (g *Cloud) PatchFirewall(f *compute.Firewall) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + + mc := newFirewallMetricContext("Patch") + return mc.Observe(g.c.Firewalls().Patch(ctx, meta.GlobalKey(f.Name), f)) +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external.go index d1a28a91582..f82b748e904 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external.go @@ -27,7 +27,7 @@ import ( "strings" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" @@ -193,13 +193,13 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, // without needing to be deleted and recreated. if firewallExists { klog.Infof("ensureExternalLoadBalancer(%s): Updating firewall.", lbRefStr) - if err := g.updateFirewall(apiService, MakeFirewallName(loadBalancerName), desc, sourceRanges, ports, hosts); err != nil { + if err := g.updateFirewall(apiService, MakeFirewallName(loadBalancerName), desc, ipAddressToUse, sourceRanges, ports, hosts); err != nil { return nil, err } klog.Infof("ensureExternalLoadBalancer(%s): Updated firewall.", lbRefStr) } else { klog.Infof("ensureExternalLoadBalancer(%s): Creating firewall.", lbRefStr) - if err := g.createFirewall(apiService, MakeFirewallName(loadBalancerName), desc, sourceRanges, ports, hosts); err != nil { + if err := g.createFirewall(apiService, MakeFirewallName(loadBalancerName), desc, ipAddressToUse, sourceRanges, ports, hosts); err != nil { return nil, err } klog.Infof("ensureExternalLoadBalancer(%s): Created firewall.", lbRefStr) @@ -921,7 +921,7 @@ func (g *Cloud) ensureHTTPHealthCheckFirewall(svc *v1.Service, serviceName, ipAd return fmt.Errorf("error getting firewall for health checks: %v", err) } klog.Infof("Creating firewall %v for health checks.", fwName) - if err := g.createFirewall(svc, fwName, desc, sourceRanges, ports, hosts); err != nil { + if err := g.createFirewall(svc, fwName, desc, ipAddress, sourceRanges, ports, hosts); err != nil { return err } klog.Infof("Created firewall %v for health checks.", fwName) @@ -934,7 +934,7 @@ func (g *Cloud) ensureHTTPHealthCheckFirewall(svc *v1.Service, serviceName, ipAd !equalStringSets(fw.Allowed[0].Ports, []string{strconv.Itoa(int(ports[0].Port))}) || !equalStringSets(fw.SourceRanges, sourceRanges.StringSlice()) { klog.Warningf("Firewall %v exists but parameters have drifted - updating...", fwName) - if err := g.updateFirewall(svc, fwName, desc, sourceRanges, ports, hosts); err != nil { + if err := g.updateFirewall(svc, fwName, desc, ipAddress, sourceRanges, ports, hosts); err != nil { klog.Warningf("Failed to reconcile firewall %v parameters.", fwName) return err } @@ -970,8 +970,8 @@ func createForwardingRule(s CloudForwardingRuleService, name, serviceName, regio return nil } -func (g *Cloud) createFirewall(svc *v1.Service, name, desc string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) error { - firewall, err := g.firewallObject(name, desc, sourceRanges, ports, hosts) +func (g *Cloud) createFirewall(svc *v1.Service, name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) error { + firewall, err := g.firewallObject(name, desc, destinationIP, sourceRanges, ports, hosts) if err != nil { return err } @@ -988,13 +988,13 @@ func (g *Cloud) createFirewall(svc *v1.Service, name, desc string, sourceRanges return nil } -func (g *Cloud) updateFirewall(svc *v1.Service, name, desc string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) error { - firewall, err := g.firewallObject(name, desc, sourceRanges, ports, hosts) +func (g *Cloud) updateFirewall(svc *v1.Service, name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) error { + firewall, err := g.firewallObject(name, desc, destinationIP, sourceRanges, ports, hosts) if err != nil { return err } - if err = g.UpdateFirewall(firewall); err != nil { + if err = g.PatchFirewall(firewall); err != nil { if isHTTPErrorCode(err, http.StatusConflict) { return nil } else if isForbidden(err) && g.OnXPN() { @@ -1007,7 +1007,9 @@ func (g *Cloud) updateFirewall(svc *v1.Service, name, desc string, sourceRanges return nil } -func (g *Cloud) firewallObject(name, desc string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) { +func (g *Cloud) firewallObject(name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) { + // destinationIP can be empty string "" and this means that it is not set. + // GCE considers empty destinationRanges as "all" for ingress firewall-rules. // Concatenate service ports into port ranges. This help to workaround the gce firewall limitation where only // 100 ports or port ranges can be used in a firewall rule. _, portRanges, _ := getPortsAndProtocol(ports) @@ -1040,6 +1042,9 @@ func (g *Cloud) firewallObject(name, desc string, sourceRanges utilnet.IPNetSet, }, }, } + if destinationIP != "" { + firewall.DestinationRanges = []string{destinationIP} + } return firewall, nil } diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external_test.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external_test.go index 05aac53a6b3..efa0cd2c0bf 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_external_test.go @@ -29,12 +29,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" compute "google.golang.org/api/compute/v1" + v1 "k8s.io/api/core/v1" cloudprovider "k8s.io/cloud-provider" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock" - "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/json" @@ -1260,7 +1260,7 @@ func TestCreateAndUpdateFirewallSucceedsOnXPN(t *testing.T) { c := gce.c.(*cloud.MockGCE) c.MockFirewalls.InsertHook = mock.InsertFirewallsUnauthorizedErrHook - c.MockFirewalls.UpdateHook = mock.UpdateFirewallsUnauthorizedErrHook + c.MockFirewalls.PatchHook = mock.UpdateFirewallsUnauthorizedErrHook gce.onXPN = true require.True(t, gce.OnXPN()) @@ -1278,6 +1278,7 @@ func TestCreateAndUpdateFirewallSucceedsOnXPN(t *testing.T) { gce.createFirewall( svc, gce.GetLoadBalancerName(context.TODO(), "", svc), + "10.0.0.1", "A sad little firewall", ipnet, svc.Spec.Ports, @@ -1291,6 +1292,7 @@ func TestCreateAndUpdateFirewallSucceedsOnXPN(t *testing.T) { svc, gce.GetLoadBalancerName(context.TODO(), "", svc), "A sad little firewall", + "10.0.0.1", ipnet, svc.Spec.Ports, hosts) @@ -1598,6 +1600,7 @@ func TestFirewallObject(t *testing.T) { gce, err := fakeGCECloud(vals) gce.nodeTags = []string{"node-tags"} require.NoError(t, err) + dstIP := "10.0.0.1" srcRanges := []string{"10.10.0.0/24", "10.20.0.0/24"} sourceRanges, _ := utilnet.ParseIPNets(srcRanges...) fwName := "test-fw" @@ -1619,6 +1622,7 @@ func TestFirewallObject(t *testing.T) { for _, tc := range []struct { desc string sourceRanges utilnet.IPNetSet + destinationIP string svcPorts []v1.ServicePort expectedFirewall func(fw compute.Firewall) compute.Firewall }{ @@ -1643,6 +1647,18 @@ func TestFirewallObject(t *testing.T) { return fw }, }, + { + desc: "has destination IP", + sourceRanges: utilnet.IPNetSet{}, + destinationIP: dstIP, + svcPorts: []v1.ServicePort{ + {Name: "port1", Protocol: v1.ProtocolTCP, Port: int32(80), TargetPort: intstr.FromInt(80)}, + }, + expectedFirewall: func(fw compute.Firewall) compute.Firewall { + fw.DestinationRanges = []string{dstIP} + return fw + }, + }, { desc: "has multiple ports", sourceRanges: sourceRanges, @@ -1687,7 +1703,7 @@ func TestFirewallObject(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - ret, err := gce.firewallObject(fwName, fwDesc, tc.sourceRanges, tc.svcPorts, nil) + ret, err := gce.firewallObject(fwName, fwDesc, tc.destinationIP, tc.sourceRanges, tc.svcPorts, nil) require.NoError(t, err) expectedFirewall := tc.expectedFirewall(baseFw) retSrcRanges := sets.NewString(ret.SourceRanges...) diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_internal.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_internal.go index b16e5888c4b..ae8ef31ed1f 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_internal.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_internal.go @@ -166,11 +166,6 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v }() } - // Ensure firewall rules if necessary - if err = g.ensureInternalFirewalls(loadBalancerName, ipToUse, clusterID, nm, svc, strconv.Itoa(int(hcPort)), sharedHealthCheck, nodes); err != nil { - return nil, err - } - fwdRuleDescription := &forwardingRuleDescription{ServiceName: nm.String()} fwdRuleDescriptionString, err := fwdRuleDescription.marshal() if err != nil { @@ -225,17 +220,23 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v } } - // Delete the previous internal load balancer resources if necessary - if existingBackendService != nil { - g.clearPreviousInternalResources(svc, loadBalancerName, existingBackendService, backendServiceName, hcName) - } - // Get the most recent forwarding rule for the address. updatedFwdRule, err := g.GetRegionForwardingRule(loadBalancerName, g.region) if err != nil { return nil, err } + ipToUse = updatedFwdRule.IPAddress + // Ensure firewall rules if necessary + if err = g.ensureInternalFirewalls(loadBalancerName, ipToUse, clusterID, nm, svc, strconv.Itoa(int(hcPort)), sharedHealthCheck, nodes); err != nil { + return nil, err + } + + // Delete the previous internal load balancer resources if necessary + if existingBackendService != nil { + g.clearPreviousInternalResources(svc, loadBalancerName, existingBackendService, backendServiceName, hcName) + } + serviceState.InSuccess = true if options.AllowGlobalAccess { serviceState.EnabledGlobalAccess = true @@ -416,7 +417,7 @@ func (g *Cloud) teardownInternalHealthCheckAndFirewall(svc *v1.Service, hcName s return nil } -func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string) error { +func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string) error { klog.V(2).Infof("ensureInternalFirewall(%v): checking existing firewall", fwName) targetTags, err := g.GetNodeTags(nodeNames(nodes)) if err != nil { @@ -465,6 +466,10 @@ func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc string, s }, } + if destinationIP != "" { + expectedFirewall.DestinationRanges = []string{destinationIP} + } + if existingFirewall == nil { klog.V(2).Infof("ensureInternalFirewall(%v): creating firewall", fwName) err = g.CreateFirewall(expectedFirewall) @@ -481,7 +486,7 @@ func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc string, s } klog.V(2).Infof("ensureInternalFirewall(%v): updating firewall", fwName) - err = g.UpdateFirewall(expectedFirewall) + err = g.PatchFirewall(expectedFirewall) if err != nil && isForbidden(err) && g.OnXPN() { klog.V(2).Infof("ensureInternalFirewall(%v): do not have permission to update firewall rule (on XPN). Raising event.", fwName) g.raiseFirewallChangeNeededEvent(svc, FirewallToGCloudUpdateCmd(expectedFirewall, g.NetworkProjectID())) @@ -498,7 +503,7 @@ func (g *Cloud) ensureInternalFirewalls(loadBalancerName, ipAddress, clusterID s if err != nil { return err } - err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName) + err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName) if err != nil { return err } @@ -506,7 +511,7 @@ func (g *Cloud) ensureInternalFirewalls(loadBalancerName, ipAddress, clusterID s // Second firewall is for health checking nodes / services fwHCName := makeHealthCheckFirewallName(loadBalancerName, clusterID, sharedHealthCheck) hcSrcRanges := L4LoadBalancerSrcRanges() - return g.ensureInternalFirewall(svc, fwHCName, "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes, "") + return g.ensureInternalFirewall(svc, fwHCName, "", "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes, "") } func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32) (*compute.HealthCheck, error) { diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_internal_test.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_internal_test.go index 15cb6acbbd8..6e2a739ed00 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_internal_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_internal_test.go @@ -34,7 +34,7 @@ import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock" "google.golang.org/api/compute/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" @@ -704,12 +704,14 @@ func TestEnsureInternalFirewallDeletesLegacyFirewall(t *testing.T) { nodes, err := createAndInsertNodes(gce, []string{"test-node-1"}, vals.ZoneName) require.NoError(t, err) + destinationIP := "10.1.2.3" sourceRange := []string{"10.0.0.0/20"} // Manually create a firewall rule with the legacy name - lbName gce.ensureInternalFirewall( svc, lbName, "firewall with legacy name", + destinationIP, sourceRange, []string{"123"}, v1.ProtocolTCP, @@ -724,6 +726,7 @@ func TestEnsureInternalFirewallDeletesLegacyFirewall(t *testing.T) { svc, fwName, "firewall with new name", + destinationIP, sourceRange, []string{"123", "456"}, v1.ProtocolTCP, @@ -746,6 +749,7 @@ func TestEnsureInternalFirewallDeletesLegacyFirewall(t *testing.T) { svc, fwName, "firewall with new name", + destinationIP, sourceRange, []string{"123", "456", "789"}, v1.ProtocolTCP, @@ -776,7 +780,7 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) { c := gce.c.(*cloud.MockGCE) c.MockFirewalls.InsertHook = mock.InsertFirewallsUnauthorizedErrHook - c.MockFirewalls.UpdateHook = mock.UpdateFirewallsUnauthorizedErrHook + c.MockFirewalls.PatchHook = mock.UpdateFirewallsUnauthorizedErrHook gce.onXPN = true require.True(t, gce.OnXPN()) @@ -785,11 +789,13 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) { nodes, err := createAndInsertNodes(gce, []string{"test-node-1"}, vals.ZoneName) require.NoError(t, err) + destinationIP := "10.1.2.3" sourceRange := []string{"10.0.0.0/20"} gce.ensureInternalFirewall( svc, fwName, "A sad little firewall", + destinationIP, sourceRange, []string{"123"}, v1.ProtocolTCP, @@ -797,17 +803,18 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) { lbName) require.Nil(t, err, "Should success when XPN is on.") - checkEvent(t, recorder, FilewallChangeMsg, true) + checkEvent(t, recorder, FirewallChangeMsg, true) // Create a firewall. c.MockFirewalls.InsertHook = nil - c.MockFirewalls.UpdateHook = nil + c.MockFirewalls.PatchHook = nil gce.onXPN = false gce.ensureInternalFirewall( svc, fwName, "A sad little firewall", + destinationIP, sourceRange, []string{"123"}, v1.ProtocolTCP, @@ -820,13 +827,14 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) { gce.onXPN = true c.MockFirewalls.InsertHook = mock.InsertFirewallsUnauthorizedErrHook - c.MockFirewalls.UpdateHook = mock.UpdateFirewallsUnauthorizedErrHook + c.MockFirewalls.PatchHook = mock.UpdateFirewallsUnauthorizedErrHook // Try to update the firewall just created. gce.ensureInternalFirewall( svc, fwName, "A happy little firewall", + destinationIP, sourceRange, []string{"123"}, v1.ProtocolTCP, @@ -834,7 +842,7 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) { lbName) require.Nil(t, err, "Should success when XPN is on.") - checkEvent(t, recorder, FilewallChangeMsg, true) + checkEvent(t, recorder, FirewallChangeMsg, true) } func TestEnsureLoadBalancerDeletedSucceedsOnXPN(t *testing.T) { @@ -856,7 +864,7 @@ func TestEnsureLoadBalancerDeletedSucceedsOnXPN(t *testing.T) { err = gce.ensureInternalLoadBalancerDeleted(vals.ClusterName, vals.ClusterID, fakeLoadbalancerService(string(LBTypeInternal))) assert.NoError(t, err) - checkEvent(t, recorder, FilewallChangeMsg, true) + checkEvent(t, recorder, FirewallChangeMsg, true) } func TestEnsureInternalInstanceGroupsDeleted(t *testing.T) { @@ -1679,12 +1687,14 @@ func TestEnsureInternalFirewallPortRanges(t *testing.T) { nodes, err := createAndInsertNodes(gce, []string{"test-node-1"}, vals.ZoneName) require.NoError(t, err) + destinationIP := "10.1.2.3" sourceRange := []string{"10.0.0.0/20"} // Manually create a firewall rule with the legacy name - lbName gce.ensureInternalFirewall( svc, fwName, "firewall with legacy name", + destinationIP, sourceRange, getPortRanges(tc.Input), v1.ProtocolTCP, diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_utils_test.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_utils_test.go index a7249184346..3871eeb5fb1 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_utils_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_utils_test.go @@ -73,7 +73,7 @@ func fakeLoadbalancerServiceHelper(lbType string, annotationKey string) *v1.Serv } var ( - FilewallChangeMsg = fmt.Sprintf("%s %s %s", v1.EventTypeNormal, eventReasonManualChange, eventMsgFirewallChange) + FirewallChangeMsg = fmt.Sprintf("%s %s %s", v1.EventTypeNormal, eventReasonManualChange, eventMsgFirewallChange) ) func createAndInsertNodes(gce *Cloud, nodeNames []string, zoneName string) ([]*v1.Node, error) {