diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 5e7bd4c645e..86b7db41072 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -191,8 +191,8 @@ var ipsetWithIptablesChain = []struct { {kubeNodePortSetTCP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolTCP}, {kubeNodePortLocalSetUDP, string(KubeNodePortChain), "RETURN", "dst", utilipset.ProtocolUDP}, {kubeNodePortSetUDP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst", utilipset.ProtocolUDP}, - {kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP}, {kubeNodePortLocalSetSCTP, string(KubeNodePortChain), "RETURN", "dst,dst", utilipset.ProtocolSCTP}, + {kubeNodePortSetSCTP, string(KubeNodePortChain), string(KubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP}, } // In IPVS proxy mode, the following flags need to be set diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index a7bd9e51fa8..0b83fac558c 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -654,9 +654,15 @@ func TestNodePortIPv4(t *testing.T) { expectedIptablesChains: netlinktest.ExpectedIptablesChain{ string(KubeNodePortChain): {{ JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetUDP, + }, { + JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, }}, string(kubeServicesChain): {{ + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet, + }, { JumpChain: string(KubeNodePortChain), MatchSet: "", + }, { + JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, }}, }, }, @@ -865,6 +871,123 @@ func TestNodePortIPv4(t *testing.T) { }, }, }, + { + name: "node port service with protocol sctp and externalTrafficPolicy local", + services: []*v1.Service{ + makeTestService("ns1", "svc1", func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = "10.20.30.41" + svc.Spec.Ports = []v1.ServicePort{{ + Name: "p80", + Port: int32(80), + Protocol: v1.ProtocolSCTP, + NodePort: int32(3001), + }} + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + }), + }, + endpoints: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"10.180.0.1"}, + NodeName: utilpointer.StringPtr(testHostname), + }, { + Addresses: []string{"10.180.1.1"}, + NodeName: utilpointer.StringPtr("otherHost"), + }} + eps.Ports = []discovery.EndpointPort{{ + Name: utilpointer.String("p80"), + Port: utilpointer.Int32(80), + Protocol: &sctpProtocol, + }} + }), + }, + nodeIPs: []net.IP{ + netutils.ParseIPSloppy("100.101.102.103"), + }, + nodePortAddresses: []string{}, + expectedIPVS: &ipvstest.FakeIPVS{ + Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ + { + IP: "10.20.30.41", + Port: 80, + Protocol: "SCTP", + }: { + Address: netutils.ParseIPSloppy("10.20.30.41"), + Protocol: "SCTP", + Port: uint16(80), + Scheduler: "rr", + }, + { + IP: "100.101.102.103", + Port: 3001, + Protocol: "SCTP", + }: { + Address: netutils.ParseIPSloppy("100.101.102.103"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + }, + Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ + { + IP: "10.20.30.41", + Port: 80, + Protocol: "SCTP", + }: { + { + Address: netutils.ParseIPSloppy("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + { + Address: netutils.ParseIPSloppy("10.180.1.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "100.101.102.103", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: netutils.ParseIPSloppy("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + }, + }, + expectedIPSets: netlinktest.ExpectedIPSet{ + kubeNodePortSetSCTP: { + { + IP: "100.101.102.103", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + }, + kubeNodePortLocalSetSCTP: { + { + IP: "100.101.102.103", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + }, + }, + expectedIptablesChains: netlinktest.ExpectedIptablesChain{ + string(KubeNodePortChain): {{ + JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetSCTP, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetSCTP, + }, { + JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, + }}, + }, + }, } for _, test := range tests { @@ -1822,6 +1945,14 @@ func TestLoadBalancer(t *testing.T) { epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet, + }, { + JumpChain: string(KubeNodePortChain), MatchSet: "", + }, { + JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, + }, { + JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet, }}, string(kubeLoadBalancerSet): {{ JumpChain: string(KubeMarkMasqChain), MatchSet: "", @@ -1913,12 +2044,18 @@ func TestOnlyLocalNodePorts(t *testing.T) { // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet, + }, { JumpChain: string(KubeNodePortChain), MatchSet: "", + }, { + JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, }}, string(KubeNodePortChain): {{ JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP, }, { JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetTCP, + }, { + JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, }}, } checkIptables(t, ipt, epIpt) @@ -1987,6 +2124,10 @@ func TestHealthCheckNodePort(t *testing.T) { // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(KubeNodePortChain): {{ + JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeNodePortSetTCP, + }, { JumpChain: "ACCEPT", MatchSet: kubeHealthCheckNodePortSet, }}, } @@ -2076,6 +2217,14 @@ func TestLoadBalanceSourceRanges(t *testing.T) { epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet, + }, { + JumpChain: string(KubeNodePortChain), MatchSet: "", + }, { + JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, + }, { + JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet, }}, string(KubeLoadBalancerChain): {{ JumpChain: string(KubeFireWallChain), MatchSet: kubeLoadbalancerFWSet, @@ -2148,9 +2297,14 @@ func TestAcceptIPVSTraffic(t *testing.T) { // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): { + {JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet}, + {JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet}, + {JumpChain: string(KubeMarkMasqChain), MatchSet: kubeExternalIPSet}, + {JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, // With externalTrafficOnlyArgs + {JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, // With dstLocalOnlyArgs + {JumpChain: string(KubeNodePortChain), MatchSet: ""}, {JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet}, {JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet}, - {JumpChain: "ACCEPT", MatchSet: kubeExternalIPSet}, }, } checkIptables(t, ipt, epIpt) @@ -2242,6 +2396,14 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ JumpChain: string(KubeLoadBalancerChain), MatchSet: kubeLoadBalancerSet, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: kubeClusterIPSet, + }, { + JumpChain: string(KubeNodePortChain), MatchSet: "", + }, { + JumpChain: "ACCEPT", MatchSet: kubeClusterIPSet, + }, { + JumpChain: "ACCEPT", MatchSet: kubeLoadBalancerSet, }}, string(KubeLoadBalancerChain): {{ JumpChain: "RETURN", MatchSet: kubeLoadBalancerLocalSet, @@ -3658,13 +3820,19 @@ func hasMasqRandomFully(rules []iptablestest.Rule) bool { return false } -// checkIptabless to check expected iptables chain and rules +// checkIptables to check expected iptables chain and rules. The got rules must have same number and order as the +// expected rules. func checkIptables(t *testing.T, ipt *iptablestest.FakeIPTables, epIpt netlinktest.ExpectedIptablesChain) { for epChain, epRules := range epIpt { rules := ipt.GetRules(epChain) - for _, epRule := range epRules { - if !hasJump(rules, epRule.JumpChain, epRule.MatchSet) { - t.Errorf("Didn't find jump from chain %v match set %v to %v", epChain, epRule.MatchSet, epRule.JumpChain) + if len(rules) != len(epRules) { + t.Errorf("Expected %d iptables rule in chain %s, got %d", len(epRules), epChain, len(rules)) + continue + } + for i, epRule := range epRules { + rule := rules[i] + if rule[iptablestest.Jump] != epRule.JumpChain || !strings.Contains(rule[iptablestest.MatchSet], epRule.MatchSet) { + t.Errorf("Expected MatchSet=%s JumpChain=%s, got MatchSet=%s JumpChain=%s", epRule.MatchSet, epRule.JumpChain, rule[iptablestest.MatchSet], rule[iptablestest.Jump]) } } }