From 9ee3ae748b013a53f0cc8941ff01bc351453cdf4 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Fri, 3 Sep 2021 22:40:15 +0800 Subject: [PATCH] Fix client IP preservation for NodePort service with protocol SCTP The iptables rule that matches kubeNodePortLocalSetSCTP must be inserted before the one matches kubeNodePortSetSCTP, otherwise all SCTP traffic would be masqueraded regardless of whether its ExternalTrafficPolicy is Local or not. To cover the case in tests, the patch adds rule order validation to checkIptables. --- pkg/proxy/ipvs/proxier.go | 2 +- pkg/proxy/ipvs/proxier_test.go | 178 ++++++++++++++++++++++++++++++++- 2 files changed, 174 insertions(+), 6 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 5e7bd4c645e..86b7db41072 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -191,8 +191,8 @@ var ipsetWithIptablesChain = []struct { {kubeNodePortSetTCP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolTCP}, {kubeNodePortLocalSetUDP, string(KubeNodePortChain), "RETURN", "dst", utilipset.ProtocolUDP}, {kubeNodePortSetUDP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolUDP}, - {kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP}, {kubeNodePortLocalSetSCTP, string(KubeNodePortChain), "RETURN", "dst,dst", utilipset.ProtocolSCTP}, + {kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP}, } // In IPVS proxy mode, the following flags need to be set diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index a7bd9e51fa8..0b83fac558c 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -654,9 +654,15 @@ func TestNodePortIPv4(t *testing.T) { expectedIptablesChains: netlinktest.ExpectedIptablesChain{ string(KubeNodePortChain): {{ JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetUDP, + }, { + JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, }}, string(kubeServicesChain): {{ + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet, + }, { JumpChain: string(KubeNodePortChain), MatchSet: "", + }, { + JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, }}, }, }, @@ -865,6 +871,123 @@ func TestNodePortIPv4(t *testing.T) { }, }, }, + { + name: "node port service with protocol sctp and externalTrafficPolicy local", + services: []*v1.Service{ + makeTestService("ns1", "svc1", func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = "10.20.30.41" + svc.Spec.Ports = []v1.ServicePort{{ + Name: "p80", + Port: int32(80), + Protocol: v1.ProtocolSCTP, + NodePort: int32(3001), + }} + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + }), + }, + endpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"10.180.0.1"}, + NodeName: utilpointer.StringPtr(testHostname), + }, { + Addresses: []string{"10.180.1.1"}, + NodeName: utilpointer.StringPtr("otherHost"), + }} + eps.Ports = []discovery.EndpointPort{{ + Name: utilpointer.String("p80"), + Port: utilpointer.Int32(80), + Protocol: &sctpProtocol, + }} + }), + }, + nodeIPs: []net.IP{ + netutils.ParseIPSloppy("100.101.102.103"), + }, + nodePortAddresses: []string{}, + expectedIPVS: &ipvstest.FakeIPVS{ + Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ + { + IP: "10.20.30.41", + Port: 80, + Protocol: "SCTP", + }: { + Address: netutils.ParseIPSloppy("10.20.30.41"), + Protocol: "SCTP", + Port: uint16(80), + Scheduler: "rr", + }, + { + IP: "100.101.102.103", + Port: 3001, + Protocol: "SCTP", + }: { + Address: netutils.ParseIPSloppy("100.101.102.103"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + }, + Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ + { + IP: "10.20.30.41", + Port: 80, + Protocol: "SCTP", + }: { + { + Address: netutils.ParseIPSloppy("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + { + Address: netutils.ParseIPSloppy("10.180.1.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "100.101.102.103", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: netutils.ParseIPSloppy("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + }, + }, + expectedIPSets: netlinktest.ExpectedIPSet{ + kubeNodePortSetSCTP: { + { + IP: "100.101.102.103", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + }, + kubeNodePortLocalSetSCTP: { + { + IP: "100.101.102.103", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + }, + }, + expectedIptablesChains: netlinktest.ExpectedIptablesChain{ + string(KubeNodePortChain): {{ + JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetSCTP, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetSCTP, + }, { + JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, + }}, + }, + }, } for _, test := range tests { @@ -1822,6 +1945,14 @@ func TestLoadBalancer(t *testing.T) { epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet, + }, { + JumpChain: string(KubeNodePortChain), MatchSet: "", + }, { + JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, + }, { + JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet, }}, string(kubeLoadBalancerSet): {{ JumpChain: string(KubeMarkMasqChain), MatchSet: "", @@ -1913,12 +2044,18 @@ func TestOnlyLocalNodePorts(t *testing.T) { // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet, + }, { JumpChain: string(KubeNodePortChain), MatchSet: "", + }, { + JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, }}, string(KubeNodePortChain): {{ JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP, }, { JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetTCP, + }, { + JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, }}, } checkIptables(t, ipt, epIpt) @@ -1987,6 +2124,10 @@ func TestHealthCheckNodePort(t *testing.T) { // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(KubeNodePortChain): {{ + JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetTCP, + }, { JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, }}, } @@ -2076,6 +2217,14 @@ func TestLoadBalanceSourceRanges(t *testing.T) { epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet, + }, { + JumpChain: string(KubeNodePortChain), MatchSet: "", + }, { + JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, + }, { + JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet, }}, string(KubeLoadBalancerChain): {{ JumpChain: string(KubeFireWallChain), MatchSet: kubeLoadbalancerFWSet, @@ -2148,9 +2297,14 @@ func TestAcceptIPVSTraffic(t *testing.T) { // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): { + {JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet}, + {JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet}, + {JumpChain: string(KubeMarkMasqChain), MatchSet: kubeExternalIPSet}, + {JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, // With externalTrafficOnlyArgs + {JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, // With dstLocalOnlyArgs + {JumpChain: string(KubeNodePortChain), MatchSet: ""}, {JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet}, {JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet}, - {JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, }, } checkIptables(t, ipt, epIpt) @@ -2242,6 +2396,14 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet, + }, { + JumpChain: string(KubeNodePortChain), MatchSet: "", + }, { + JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, + }, { + JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet, }}, string(KubeLoadBalancerChain): {{ JumpChain: "RETURN", MatchSet: kubeLoadBalancerLocalSet, @@ -3658,13 +3820,19 @@ func hasMasqRandomFully(rules []iptablestest.Rule) bool { return false } -// checkIptabless to check expected iptables chain and rules +// checkIptables to check expected iptables chain and rules. The got rules must have same number and order as the +// expected rules. func checkIptables(t *testing.T, ipt *iptablestest.FakeIPTables, epIpt netlinktest.ExpectedIptablesChain) { for epChain, epRules := range epIpt { rules := ipt.GetRules(epChain) - for _, epRule := range epRules { - if !hasJump(rules, epRule.JumpChain, epRule.MatchSet) { - t.Errorf("Didn't find jump from chain %v match set %v to %v", epChain, epRule.MatchSet, epRule.JumpChain) + if len(rules) != len(epRules) { + t.Errorf("Expected %d iptables rule in chain %s, got %d", len(epRules), epChain, len(rules)) + continue + } + for i, epRule := range epRules { + rule := rules[i] + if rule[iptablestest.Jump] != epRule.JumpChain || !strings.Contains(rule[iptablestest.MatchSet], epRule.MatchSet) { + t.Errorf("Expected MatchSet=%s JumpChain=%s, got MatchSet=%s JumpChain=%s", epRule.MatchSet, epRule.JumpChain, rule[iptablestest.MatchSet], rule[iptablestest.Jump]) } } }