Fix client IP preservation for NodePort service with protocol SCTP

The iptables rule that matches kubeNodePortLocalSetSCTP must be inserted
before the one matches kubeNodePortSetSCTP, otherwise all SCTP traffic
would be masqueraded regardless of whether its ExternalTrafficPolicy is
Local or not.

To cover the case in tests, the patch adds rule order validation to
checkIptables.
This commit is contained in:
Quan Tian 2021-09-03 22:40:15 +08:00
parent fcd713e5cd
commit 9ee3ae748b
2 changed files with 174 additions and 6 deletions

View File

@ -191,8 +191,8 @@ var ipsetWithIptablesChain = []struct {
{kubeNodePortSetTCP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolTCP},
{kubeNodePortLocalSetUDP, string(KubeNodePortChain), "RETURN", "dst", utilipset.ProtocolUDP},
{kubeNodePortSetUDP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolUDP},
{kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP},
{kubeNodePortLocalSetSCTP, string(KubeNodePortChain), "RETURN", "dst,dst", utilipset.ProtocolSCTP},
{kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP},
}
// In IPVS proxy mode, the following flags need to be set

View File

@ -654,9 +654,15 @@ func TestNodePortIPv4(t *testing.T) {
expectedIptablesChains: netlinktest.ExpectedIptablesChain{
string(KubeNodePortChain): {{
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetUDP,
}, {
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
}},
string(kubeServicesChain): {{
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}},
},
},
@ -865,6 +871,123 @@ func TestNodePortIPv4(t *testing.T) {
},
},
},
{
name: "node port service with protocol sctp and externalTrafficPolicy local",
services: []*v1.Service{
makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = "10.20.30.41"
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: int32(80),
Protocol: v1.ProtocolSCTP,
NodePort: int32(3001),
}}
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
}),
},
endpoints: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"10.180.0.1"},
NodeName: utilpointer.StringPtr(testHostname),
}, {
Addresses: []string{"10.180.1.1"},
NodeName: utilpointer.StringPtr("otherHost"),
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.String("p80"),
Port: utilpointer.Int32(80),
Protocol: &sctpProtocol,
}}
}),
},
nodeIPs: []net.IP{
netutils.ParseIPSloppy("100.101.102.103"),
},
nodePortAddresses: []string{},
expectedIPVS: &ipvstest.FakeIPVS{
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
{
IP: "10.20.30.41",
Port: 80,
Protocol: "SCTP",
}: {
Address: netutils.ParseIPSloppy("10.20.30.41"),
Protocol: "SCTP",
Port: uint16(80),
Scheduler: "rr",
},
{
IP: "100.101.102.103",
Port: 3001,
Protocol: "SCTP",
}: {
Address: netutils.ParseIPSloppy("100.101.102.103"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
},
Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
{
IP: "10.20.30.41",
Port: 80,
Protocol: "SCTP",
}: {
{
Address: netutils.ParseIPSloppy("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
{
Address: netutils.ParseIPSloppy("10.180.1.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "100.101.102.103",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: netutils.ParseIPSloppy("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
},
},
expectedIPSets: netlinktest.ExpectedIPSet{
kubeNodePortSetSCTP: {
{
IP: "100.101.102.103",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
},
kubeNodePortLocalSetSCTP: {
{
IP: "100.101.102.103",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
},
},
expectedIptablesChains: netlinktest.ExpectedIptablesChain{
string(KubeNodePortChain): {{
JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetSCTP,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetSCTP,
}, {
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
}},
},
},
}
for _, test := range tests {
@ -1822,6 +1945,14 @@ func TestLoadBalancer(t *testing.T) {
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}, {
JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet,
}},
string(kubeLoadBalancerSet): {{
JumpChain: string(KubeMarkMasqChain), MatchSet: "",
@ -1913,12 +2044,18 @@ func TestOnlyLocalNodePorts(t *testing.T) {
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}},
string(KubeNodePortChain): {{
JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetTCP,
}, {
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
}},
}
checkIptables(t, ipt, epIpt)
@ -1987,6 +2124,10 @@ func TestHealthCheckNodePort(t *testing.T) {
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(KubeNodePortChain): {{
JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetTCP,
}, {
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
}},
}
@ -2076,6 +2217,14 @@ func TestLoadBalanceSourceRanges(t *testing.T) {
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}, {
JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet,
}},
string(KubeLoadBalancerChain): {{
JumpChain: string(KubeFireWallChain), MatchSet: kubeLoadbalancerFWSet,
@ -2148,9 +2297,14 @@ func TestAcceptIPVSTraffic(t *testing.T) {
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {
{JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet},
{JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet},
{JumpChain: string(KubeMarkMasqChain), MatchSet: kubeExternalIPSet},
{JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, // With externalTrafficOnlyArgs
{JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, // With dstLocalOnlyArgs
{JumpChain: string(KubeNodePortChain), MatchSet: ""},
{JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet},
{JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet},
{JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet},
},
}
checkIptables(t, ipt, epIpt)
@ -2242,6 +2396,14 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}, {
JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet,
}},
string(KubeLoadBalancerChain): {{
JumpChain: "RETURN", MatchSet: kubeLoadBalancerLocalSet,
@ -3658,13 +3820,19 @@ func hasMasqRandomFully(rules []iptablestest.Rule) bool {
return false
}
// checkIptabless to check expected iptables chain and rules
// checkIptables to check expected iptables chain and rules. The got rules must have same number and order as the
// expected rules.
func checkIptables(t *testing.T, ipt *iptablestest.FakeIPTables, epIpt netlinktest.ExpectedIptablesChain) {
for epChain, epRules := range epIpt {
rules := ipt.GetRules(epChain)
for _, epRule := range epRules {
if !hasJump(rules, epRule.JumpChain, epRule.MatchSet) {
t.Errorf("Didn't find jump from chain %v match set %v to %v", epChain, epRule.MatchSet, epRule.JumpChain)
if len(rules) != len(epRules) {
t.Errorf("Expected %d iptables rule in chain %s, got %d", len(epRules), epChain, len(rules))
continue
}
for i, epRule := range epRules {
rule := rules[i]
if rule[iptablestest.Jump] != epRule.JumpChain || !strings.Contains(rule[iptablestest.MatchSet], epRule.MatchSet) {
t.Errorf("Expected MatchSet=%s JumpChain=%s, got MatchSet=%s JumpChain=%s", epRule.MatchSet, epRule.JumpChain, rule[iptablestest.MatchSet], rule[iptablestest.Jump])
}
}
}