Merge pull request #104756 from tnqn/ipvs-sctp-masquerade

Fix client IP preservation for NodePort service with protocol SCTP
This commit is contained in:
Kubernetes Prow Robot 2021-09-09 15:34:56 -07:00 committed by GitHub
commit a402f1753c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 174 additions and 6 deletions

View File

@ -191,8 +191,8 @@ var ipsetWithIptablesChain = []struct {
{kubeNodePortSetTCP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolTCP},
{kubeNodePortLocalSetUDP, string(KubeNodePortChain), "RETURN", "dst", utilipset.ProtocolUDP},
{kubeNodePortSetUDP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolUDP},
{kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP},
{kubeNodePortLocalSetSCTP, string(KubeNodePortChain), "RETURN", "dst,dst", utilipset.ProtocolSCTP},
{kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP},
}
// In IPVS proxy mode, the following flags need to be set

View File

@ -654,9 +654,15 @@ func TestNodePortIPv4(t *testing.T) {
expectedIptablesChains: netlinktest.ExpectedIptablesChain{
string(KubeNodePortChain): {{
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetUDP,
}, {
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
}},
string(kubeServicesChain): {{
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}},
},
},
@ -865,6 +871,123 @@ func TestNodePortIPv4(t *testing.T) {
},
},
},
{
name: "node port service with protocol sctp and externalTrafficPolicy local",
services: []*v1.Service{
makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = "10.20.30.41"
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: int32(80),
Protocol: v1.ProtocolSCTP,
NodePort: int32(3001),
}}
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
}),
},
endpoints: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"10.180.0.1"},
NodeName: utilpointer.StringPtr(testHostname),
}, {
Addresses: []string{"10.180.1.1"},
NodeName: utilpointer.StringPtr("otherHost"),
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.String("p80"),
Port: utilpointer.Int32(80),
Protocol: &sctpProtocol,
}}
}),
},
nodeIPs: []net.IP{
netutils.ParseIPSloppy("100.101.102.103"),
},
nodePortAddresses: []string{},
expectedIPVS: &ipvstest.FakeIPVS{
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
{
IP: "10.20.30.41",
Port: 80,
Protocol: "SCTP",
}: {
Address: netutils.ParseIPSloppy("10.20.30.41"),
Protocol: "SCTP",
Port: uint16(80),
Scheduler: "rr",
},
{
IP: "100.101.102.103",
Port: 3001,
Protocol: "SCTP",
}: {
Address: netutils.ParseIPSloppy("100.101.102.103"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
},
Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
{
IP: "10.20.30.41",
Port: 80,
Protocol: "SCTP",
}: {
{
Address: netutils.ParseIPSloppy("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
{
Address: netutils.ParseIPSloppy("10.180.1.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "100.101.102.103",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: netutils.ParseIPSloppy("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
},
},
expectedIPSets: netlinktest.ExpectedIPSet{
kubeNodePortSetSCTP: {
{
IP: "100.101.102.103",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
},
kubeNodePortLocalSetSCTP: {
{
IP: "100.101.102.103",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
},
},
expectedIptablesChains: netlinktest.ExpectedIptablesChain{
string(KubeNodePortChain): {{
JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetSCTP,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetSCTP,
}, {
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
}},
},
},
}
for _, test := range tests {
@ -1822,6 +1945,14 @@ func TestLoadBalancer(t *testing.T) {
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}, {
JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet,
}},
string(kubeLoadBalancerSet): {{
JumpChain: string(KubeMarkMasqChain), MatchSet: "",
@ -1913,12 +2044,18 @@ func TestOnlyLocalNodePorts(t *testing.T) {
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}},
string(KubeNodePortChain): {{
JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetTCP,
}, {
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
}},
}
checkIptables(t, ipt, epIpt)
@ -1987,6 +2124,10 @@ func TestHealthCheckNodePort(t *testing.T) {
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(KubeNodePortChain): {{
JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetTCP,
}, {
JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet,
}},
}
@ -2076,6 +2217,14 @@ func TestLoadBalanceSourceRanges(t *testing.T) {
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}, {
JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet,
}},
string(KubeLoadBalancerChain): {{
JumpChain: string(KubeFireWallChain), MatchSet: kubeLoadbalancerFWSet,
@ -2148,9 +2297,14 @@ func TestAcceptIPVSTraffic(t *testing.T) {
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {
{JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet},
{JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet},
{JumpChain: string(KubeMarkMasqChain), MatchSet: kubeExternalIPSet},
{JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, // With externalTrafficOnlyArgs
{JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, // With dstLocalOnlyArgs
{JumpChain: string(KubeNodePortChain), MatchSet: ""},
{JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet},
{JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet},
{JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet},
},
}
checkIptables(t, ipt, epIpt)
@ -2242,6 +2396,14 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet,
}, {
JumpChain: string(KubeNodePortChain), MatchSet: "",
}, {
JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet,
}, {
JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet,
}},
string(KubeLoadBalancerChain): {{
JumpChain: "RETURN", MatchSet: kubeLoadBalancerLocalSet,
@ -3658,13 +3820,19 @@ func hasMasqRandomFully(rules []iptablestest.Rule) bool {
return false
}
// checkIptabless to check expected iptables chain and rules
// checkIptables to check expected iptables chain and rules. The got rules must have same number and order as the
// expected rules.
func checkIptables(t *testing.T, ipt *iptablestest.FakeIPTables, epIpt netlinktest.ExpectedIptablesChain) {
for epChain, epRules := range epIpt {
rules := ipt.GetRules(epChain)
for _, epRule := range epRules {
if !hasJump(rules, epRule.JumpChain, epRule.MatchSet) {
t.Errorf("Didn't find jump from chain %v match set %v to %v", epChain, epRule.MatchSet, epRule.JumpChain)
if len(rules) != len(epRules) {
t.Errorf("Expected %d iptables rule in chain %s, got %d", len(epRules), epChain, len(rules))
continue
}
for i, epRule := range epRules {
rule := rules[i]
if rule[iptablestest.Jump] != epRule.JumpChain || !strings.Contains(rule[iptablestest.MatchSet], epRule.MatchSet) {
t.Errorf("Expected MatchSet=%s JumpChain=%s, got MatchSet=%s JumpChain=%s", epRule.MatchSet, epRule.JumpChain, rule[iptablestest.MatchSet], rule[iptablestest.Jump])
}
}
}