add ILB and NetLB pinhole changes

This commit is contained in:
Sugang Li 2022-05-05 20:42:27 +00:00
parent 1d81106534
commit 84903d4b84
6 changed files with 81 additions and 36 deletions

View File

@ -66,3 +66,12 @@ func (g *Cloud) UpdateFirewall(f *compute.Firewall) error {
mc := newFirewallMetricContext("update")
return mc.Observe(g.c.Firewalls().Update(ctx, meta.GlobalKey(f.Name), f))
}
// PatchFirewall applies the given firewall as an update to an existing service.
func (g *Cloud) PatchFirewall(f *compute.Firewall) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newFirewallMetricContext("Patch")
return mc.Observe(g.c.Firewalls().Patch(ctx, meta.GlobalKey(f.Name), f))
}

View File

@ -27,7 +27,7 @@ import (
"strings"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
@ -193,13 +193,13 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string,
// without needing to be deleted and recreated.
if firewallExists {
klog.Infof("ensureExternalLoadBalancer(%s): Updating firewall.", lbRefStr)
if err := g.updateFirewall(apiService, MakeFirewallName(loadBalancerName), desc, sourceRanges, ports, hosts); err != nil {
if err := g.updateFirewall(apiService, MakeFirewallName(loadBalancerName), desc, ipAddressToUse, sourceRanges, ports, hosts); err != nil {
return nil, err
}
klog.Infof("ensureExternalLoadBalancer(%s): Updated firewall.", lbRefStr)
} else {
klog.Infof("ensureExternalLoadBalancer(%s): Creating firewall.", lbRefStr)
if err := g.createFirewall(apiService, MakeFirewallName(loadBalancerName), desc, sourceRanges, ports, hosts); err != nil {
if err := g.createFirewall(apiService, MakeFirewallName(loadBalancerName), desc, ipAddressToUse, sourceRanges, ports, hosts); err != nil {
return nil, err
}
klog.Infof("ensureExternalLoadBalancer(%s): Created firewall.", lbRefStr)
@ -921,7 +921,7 @@ func (g *Cloud) ensureHTTPHealthCheckFirewall(svc *v1.Service, serviceName, ipAd
return fmt.Errorf("error getting firewall for health checks: %v", err)
}
klog.Infof("Creating firewall %v for health checks.", fwName)
if err := g.createFirewall(svc, fwName, desc, sourceRanges, ports, hosts); err != nil {
if err := g.createFirewall(svc, fwName, desc, ipAddress, sourceRanges, ports, hosts); err != nil {
return err
}
klog.Infof("Created firewall %v for health checks.", fwName)
@ -934,7 +934,7 @@ func (g *Cloud) ensureHTTPHealthCheckFirewall(svc *v1.Service, serviceName, ipAd
!equalStringSets(fw.Allowed[0].Ports, []string{strconv.Itoa(int(ports[0].Port))}) ||
!equalStringSets(fw.SourceRanges, sourceRanges.StringSlice()) {
klog.Warningf("Firewall %v exists but parameters have drifted - updating...", fwName)
if err := g.updateFirewall(svc, fwName, desc, sourceRanges, ports, hosts); err != nil {
if err := g.updateFirewall(svc, fwName, desc, ipAddress, sourceRanges, ports, hosts); err != nil {
klog.Warningf("Failed to reconcile firewall %v parameters.", fwName)
return err
}
@ -970,8 +970,8 @@ func createForwardingRule(s CloudForwardingRuleService, name, serviceName, regio
return nil
}
func (g *Cloud) createFirewall(svc *v1.Service, name, desc string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) error {
firewall, err := g.firewallObject(name, desc, sourceRanges, ports, hosts)
func (g *Cloud) createFirewall(svc *v1.Service, name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) error {
firewall, err := g.firewallObject(name, desc, destinationIP, sourceRanges, ports, hosts)
if err != nil {
return err
}
@ -988,13 +988,13 @@ func (g *Cloud) createFirewall(svc *v1.Service, name, desc string, sourceRanges
return nil
}
func (g *Cloud) updateFirewall(svc *v1.Service, name, desc string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) error {
firewall, err := g.firewallObject(name, desc, sourceRanges, ports, hosts)
func (g *Cloud) updateFirewall(svc *v1.Service, name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) error {
firewall, err := g.firewallObject(name, desc, destinationIP, sourceRanges, ports, hosts)
if err != nil {
return err
}
if err = g.UpdateFirewall(firewall); err != nil {
if err = g.PatchFirewall(firewall); err != nil {
if isHTTPErrorCode(err, http.StatusConflict) {
return nil
} else if isForbidden(err) && g.OnXPN() {
@ -1007,7 +1007,9 @@ func (g *Cloud) updateFirewall(svc *v1.Service, name, desc string, sourceRanges
return nil
}
func (g *Cloud) firewallObject(name, desc string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
func (g *Cloud) firewallObject(name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
// destinationIP can be empty string "" and this means that it is not set.
// GCE considers empty destinationRanges as "all" for ingress firewall-rules.
// Concatenate service ports into port ranges. This help to workaround the gce firewall limitation where only
// 100 ports or port ranges can be used in a firewall rule.
_, portRanges, _ := getPortsAndProtocol(ports)
@ -1040,6 +1042,9 @@ func (g *Cloud) firewallObject(name, desc string, sourceRanges utilnet.IPNetSet,
},
},
}
if destinationIP != "" {
firewall.DestinationRanges = []string{destinationIP}
}
return firewall, nil
}

View File

@ -29,12 +29,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
compute "google.golang.org/api/compute/v1"
v1 "k8s.io/api/core/v1"
cloudprovider "k8s.io/cloud-provider"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/json"
@ -1260,7 +1260,7 @@ func TestCreateAndUpdateFirewallSucceedsOnXPN(t *testing.T) {
c := gce.c.(*cloud.MockGCE)
c.MockFirewalls.InsertHook = mock.InsertFirewallsUnauthorizedErrHook
c.MockFirewalls.UpdateHook = mock.UpdateFirewallsUnauthorizedErrHook
c.MockFirewalls.PatchHook = mock.UpdateFirewallsUnauthorizedErrHook
gce.onXPN = true
require.True(t, gce.OnXPN())
@ -1278,6 +1278,7 @@ func TestCreateAndUpdateFirewallSucceedsOnXPN(t *testing.T) {
gce.createFirewall(
svc,
gce.GetLoadBalancerName(context.TODO(), "", svc),
"10.0.0.1",
"A sad little firewall",
ipnet,
svc.Spec.Ports,
@ -1291,6 +1292,7 @@ func TestCreateAndUpdateFirewallSucceedsOnXPN(t *testing.T) {
svc,
gce.GetLoadBalancerName(context.TODO(), "", svc),
"A sad little firewall",
"10.0.0.1",
ipnet,
svc.Spec.Ports,
hosts)
@ -1598,6 +1600,7 @@ func TestFirewallObject(t *testing.T) {
gce, err := fakeGCECloud(vals)
gce.nodeTags = []string{"node-tags"}
require.NoError(t, err)
dstIP := "10.0.0.1"
srcRanges := []string{"10.10.0.0/24", "10.20.0.0/24"}
sourceRanges, _ := utilnet.ParseIPNets(srcRanges...)
fwName := "test-fw"
@ -1619,6 +1622,7 @@ func TestFirewallObject(t *testing.T) {
for _, tc := range []struct {
desc string
sourceRanges utilnet.IPNetSet
destinationIP string
svcPorts []v1.ServicePort
expectedFirewall func(fw compute.Firewall) compute.Firewall
}{
@ -1643,6 +1647,18 @@ func TestFirewallObject(t *testing.T) {
return fw
},
},
{
desc: "has destination IP",
sourceRanges: utilnet.IPNetSet{},
destinationIP: dstIP,
svcPorts: []v1.ServicePort{
{Name: "port1", Protocol: v1.ProtocolTCP, Port: int32(80), TargetPort: intstr.FromInt(80)},
},
expectedFirewall: func(fw compute.Firewall) compute.Firewall {
fw.DestinationRanges = []string{dstIP}
return fw
},
},
{
desc: "has multiple ports",
sourceRanges: sourceRanges,
@ -1687,7 +1703,7 @@ func TestFirewallObject(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
ret, err := gce.firewallObject(fwName, fwDesc, tc.sourceRanges, tc.svcPorts, nil)
ret, err := gce.firewallObject(fwName, fwDesc, tc.destinationIP, tc.sourceRanges, tc.svcPorts, nil)
require.NoError(t, err)
expectedFirewall := tc.expectedFirewall(baseFw)
retSrcRanges := sets.NewString(ret.SourceRanges...)

View File

@ -166,11 +166,6 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v
}()
}
// Ensure firewall rules if necessary
if err = g.ensureInternalFirewalls(loadBalancerName, ipToUse, clusterID, nm, svc, strconv.Itoa(int(hcPort)), sharedHealthCheck, nodes); err != nil {
return nil, err
}
fwdRuleDescription := &forwardingRuleDescription{ServiceName: nm.String()}
fwdRuleDescriptionString, err := fwdRuleDescription.marshal()
if err != nil {
@ -225,17 +220,23 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v
}
}
// Delete the previous internal load balancer resources if necessary
if existingBackendService != nil {
g.clearPreviousInternalResources(svc, loadBalancerName, existingBackendService, backendServiceName, hcName)
}
// Get the most recent forwarding rule for the address.
updatedFwdRule, err := g.GetRegionForwardingRule(loadBalancerName, g.region)
if err != nil {
return nil, err
}
ipToUse = updatedFwdRule.IPAddress
// Ensure firewall rules if necessary
if err = g.ensureInternalFirewalls(loadBalancerName, ipToUse, clusterID, nm, svc, strconv.Itoa(int(hcPort)), sharedHealthCheck, nodes); err != nil {
return nil, err
}
// Delete the previous internal load balancer resources if necessary
if existingBackendService != nil {
g.clearPreviousInternalResources(svc, loadBalancerName, existingBackendService, backendServiceName, hcName)
}
serviceState.InSuccess = true
if options.AllowGlobalAccess {
serviceState.EnabledGlobalAccess = true
@ -416,7 +417,7 @@ func (g *Cloud) teardownInternalHealthCheckAndFirewall(svc *v1.Service, hcName s
return nil
}
func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string) error {
func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string) error {
klog.V(2).Infof("ensureInternalFirewall(%v): checking existing firewall", fwName)
targetTags, err := g.GetNodeTags(nodeNames(nodes))
if err != nil {
@ -465,6 +466,10 @@ func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc string, s
},
}
if destinationIP != "" {
expectedFirewall.DestinationRanges = []string{destinationIP}
}
if existingFirewall == nil {
klog.V(2).Infof("ensureInternalFirewall(%v): creating firewall", fwName)
err = g.CreateFirewall(expectedFirewall)
@ -481,7 +486,7 @@ func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc string, s
}
klog.V(2).Infof("ensureInternalFirewall(%v): updating firewall", fwName)
err = g.UpdateFirewall(expectedFirewall)
err = g.PatchFirewall(expectedFirewall)
if err != nil && isForbidden(err) && g.OnXPN() {
klog.V(2).Infof("ensureInternalFirewall(%v): do not have permission to update firewall rule (on XPN). Raising event.", fwName)
g.raiseFirewallChangeNeededEvent(svc, FirewallToGCloudUpdateCmd(expectedFirewall, g.NetworkProjectID()))
@ -498,7 +503,7 @@ func (g *Cloud) ensureInternalFirewalls(loadBalancerName, ipAddress, clusterID s
if err != nil {
return err
}
err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName)
err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName)
if err != nil {
return err
}
@ -506,7 +511,7 @@ func (g *Cloud) ensureInternalFirewalls(loadBalancerName, ipAddress, clusterID s
// Second firewall is for health checking nodes / services
fwHCName := makeHealthCheckFirewallName(loadBalancerName, clusterID, sharedHealthCheck)
hcSrcRanges := L4LoadBalancerSrcRanges()
return g.ensureInternalFirewall(svc, fwHCName, "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes, "")
return g.ensureInternalFirewall(svc, fwHCName, "", "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes, "")
}
func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32) (*compute.HealthCheck, error) {

View File

@ -34,7 +34,7 @@ import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock"
"google.golang.org/api/compute/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
@ -704,12 +704,14 @@ func TestEnsureInternalFirewallDeletesLegacyFirewall(t *testing.T) {
nodes, err := createAndInsertNodes(gce, []string{"test-node-1"}, vals.ZoneName)
require.NoError(t, err)
destinationIP := "10.1.2.3"
sourceRange := []string{"10.0.0.0/20"}
// Manually create a firewall rule with the legacy name - lbName
gce.ensureInternalFirewall(
svc,
lbName,
"firewall with legacy name",
destinationIP,
sourceRange,
[]string{"123"},
v1.ProtocolTCP,
@ -724,6 +726,7 @@ func TestEnsureInternalFirewallDeletesLegacyFirewall(t *testing.T) {
svc,
fwName,
"firewall with new name",
destinationIP,
sourceRange,
[]string{"123", "456"},
v1.ProtocolTCP,
@ -746,6 +749,7 @@ func TestEnsureInternalFirewallDeletesLegacyFirewall(t *testing.T) {
svc,
fwName,
"firewall with new name",
destinationIP,
sourceRange,
[]string{"123", "456", "789"},
v1.ProtocolTCP,
@ -776,7 +780,7 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) {
c := gce.c.(*cloud.MockGCE)
c.MockFirewalls.InsertHook = mock.InsertFirewallsUnauthorizedErrHook
c.MockFirewalls.UpdateHook = mock.UpdateFirewallsUnauthorizedErrHook
c.MockFirewalls.PatchHook = mock.UpdateFirewallsUnauthorizedErrHook
gce.onXPN = true
require.True(t, gce.OnXPN())
@ -785,11 +789,13 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) {
nodes, err := createAndInsertNodes(gce, []string{"test-node-1"}, vals.ZoneName)
require.NoError(t, err)
destinationIP := "10.1.2.3"
sourceRange := []string{"10.0.0.0/20"}
gce.ensureInternalFirewall(
svc,
fwName,
"A sad little firewall",
destinationIP,
sourceRange,
[]string{"123"},
v1.ProtocolTCP,
@ -797,17 +803,18 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) {
lbName)
require.Nil(t, err, "Should success when XPN is on.")
checkEvent(t, recorder, FilewallChangeMsg, true)
checkEvent(t, recorder, FirewallChangeMsg, true)
// Create a firewall.
c.MockFirewalls.InsertHook = nil
c.MockFirewalls.UpdateHook = nil
c.MockFirewalls.PatchHook = nil
gce.onXPN = false
gce.ensureInternalFirewall(
svc,
fwName,
"A sad little firewall",
destinationIP,
sourceRange,
[]string{"123"},
v1.ProtocolTCP,
@ -820,13 +827,14 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) {
gce.onXPN = true
c.MockFirewalls.InsertHook = mock.InsertFirewallsUnauthorizedErrHook
c.MockFirewalls.UpdateHook = mock.UpdateFirewallsUnauthorizedErrHook
c.MockFirewalls.PatchHook = mock.UpdateFirewallsUnauthorizedErrHook
// Try to update the firewall just created.
gce.ensureInternalFirewall(
svc,
fwName,
"A happy little firewall",
destinationIP,
sourceRange,
[]string{"123"},
v1.ProtocolTCP,
@ -834,7 +842,7 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) {
lbName)
require.Nil(t, err, "Should success when XPN is on.")
checkEvent(t, recorder, FilewallChangeMsg, true)
checkEvent(t, recorder, FirewallChangeMsg, true)
}
func TestEnsureLoadBalancerDeletedSucceedsOnXPN(t *testing.T) {
@ -856,7 +864,7 @@ func TestEnsureLoadBalancerDeletedSucceedsOnXPN(t *testing.T) {
err = gce.ensureInternalLoadBalancerDeleted(vals.ClusterName, vals.ClusterID, fakeLoadbalancerService(string(LBTypeInternal)))
assert.NoError(t, err)
checkEvent(t, recorder, FilewallChangeMsg, true)
checkEvent(t, recorder, FirewallChangeMsg, true)
}
func TestEnsureInternalInstanceGroupsDeleted(t *testing.T) {
@ -1679,12 +1687,14 @@ func TestEnsureInternalFirewallPortRanges(t *testing.T) {
nodes, err := createAndInsertNodes(gce, []string{"test-node-1"}, vals.ZoneName)
require.NoError(t, err)
destinationIP := "10.1.2.3"
sourceRange := []string{"10.0.0.0/20"}
// Manually create a firewall rule with the legacy name - lbName
gce.ensureInternalFirewall(
svc,
fwName,
"firewall with legacy name",
destinationIP,
sourceRange,
getPortRanges(tc.Input),
v1.ProtocolTCP,

View File

@ -73,7 +73,7 @@ func fakeLoadbalancerServiceHelper(lbType string, annotationKey string) *v1.Serv
}
var (
FilewallChangeMsg = fmt.Sprintf("%s %s %s", v1.EventTypeNormal, eventReasonManualChange, eventMsgFirewallChange)
FirewallChangeMsg = fmt.Sprintf("%s %s %s", v1.EventTypeNormal, eventReasonManualChange, eventMsgFirewallChange)
)
func createAndInsertNodes(gce *Cloud, nodeNames []string, zoneName string) ([]*v1.Node, error) {