diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index e2129d944f9..72ca19383ce 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1204,42 +1204,57 @@ func (proxier *Proxier) syncProxyRules() { // Nodeports need SNAT, unless they're local. // ipset call - var nodePortSet *IPSet + var ( + nodePortSet *IPSet + entries []*utilipset.Entry + ) + switch protocol { case "tcp": nodePortSet = proxier.ipsetList[kubeNodePortSetTCP] - entry = &utilipset.Entry{ + entries = []*utilipset.Entry{{ // No need to provide ip info Port: svcInfo.NodePort(), Protocol: protocol, SetType: utilipset.BitmapPort, - } + }} case "udp": nodePortSet = proxier.ipsetList[kubeNodePortSetUDP] - entry = &utilipset.Entry{ + entries = []*utilipset.Entry{{ // No need to provide ip info Port: svcInfo.NodePort(), Protocol: protocol, SetType: utilipset.BitmapPort, - } + }} case "sctp": nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP] - entry = &utilipset.Entry{ - IP: proxier.nodeIP.String(), - Port: svcInfo.NodePort(), - Protocol: protocol, - SetType: utilipset.HashIPPort, + // Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries. + entries = []*utilipset.Entry{} + for _, nodeIP := range nodeIPs { + entries = append(entries, &utilipset.Entry{ + IP: nodeIP.String(), + Port: svcInfo.NodePort(), + Protocol: protocol, + SetType: utilipset.HashIPPort, + }) } default: // It should never hit klog.Errorf("Unsupported protocol type: %s", protocol) } if nodePortSet != nil { - if valid := nodePortSet.validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name)) + entryInvalidErr := false + for _, entry := range entries { + if valid := nodePortSet.validateEntry(entry); !valid { + klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name)) + entryInvalidErr = true + break + } + nodePortSet.activeEntries.Insert(entry.String()) + } + if entryInvalidErr { continue } - nodePortSet.activeEntries.Insert(entry.String()) } // Add externaltrafficpolicy=local type nodeport entry @@ -1257,11 +1272,18 @@ func (proxier *Proxier) syncProxyRules() { klog.Errorf("Unsupported protocol type: %s", protocol) } if nodePortLocalSet != nil { - if valid := nodePortLocalSet.validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name)) + entryInvalidErr := false + for _, entry := range entries { + if valid := nodePortLocalSet.validateEntry(entry); !valid { + klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name)) + entryInvalidErr = true + break + } + nodePortLocalSet.activeEntries.Insert(entry.String()) + } + if entryInvalidErr { continue } - nodePortLocalSet.activeEntries.Insert(entry.String()) } } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 5a69e105c85..a4ba2b2c600 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "reflect" + "sort" "strings" "testing" @@ -695,6 +696,236 @@ func TestNodePort(t *testing.T) { }, }, }, + { + name: "node port service with protocol sctp on a node with multiple nodeIPs", + services: []*v1.Service{ + makeTestService("ns1", "svc1", func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = "10.20.30.41" + svc.Spec.Ports = []v1.ServicePort{{ + Name: "p80", + Port: int32(80), + Protocol: v1.ProtocolSCTP, + NodePort: int32(3001), + }} + }), + }, + endpoints: []*v1.Endpoints{ + makeTestEndpoints("ns1", "svc1", func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: "10.180.0.1", + }}, + Ports: []v1.EndpointPort{{ + Name: "p80", + Port: int32(80), + }}, + }} + }), + }, + nodeIPs: []net.IP{ + net.ParseIP("100.101.102.103"), + net.ParseIP("100.101.102.104"), + net.ParseIP("100.101.102.105"), + net.ParseIP("2001:db8::1:1"), + net.ParseIP("2001:db8::1:2"), + net.ParseIP("2001:db8::1:3"), + }, + nodePortAddresses: []string{}, + expectedIPVS: &ipvstest.FakeIPVS{ + Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ + { + IP: "10.20.30.41", + Port: 80, + Protocol: "SCTP", + }: { + Address: net.ParseIP("10.20.30.41"), + Protocol: "SCTP", + Port: uint16(80), + Scheduler: "rr", + }, + { + IP: "100.101.102.103", + Port: 3001, + Protocol: "SCTP", + }: { + Address: net.ParseIP("100.101.102.103"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + { + IP: "100.101.102.104", + Port: 3001, + Protocol: "SCTP", + }: { + Address: net.ParseIP("100.101.102.104"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + { + IP: "100.101.102.105", + Port: 3001, + Protocol: "SCTP", + }: { + Address: net.ParseIP("100.101.102.105"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + { + IP: "2001:db8::1:1", + Port: 3001, + Protocol: "SCTP", + }: { + Address: net.ParseIP("2001:db8::1:1"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + { + IP: "2001:db8::1:2", + Port: 3001, + Protocol: "SCTP", + }: { + Address: net.ParseIP("2001:db8::1:2"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + { + IP: "2001:db8::1:3", + Port: 3001, + Protocol: "SCTP", + }: { + Address: net.ParseIP("2001:db8::1:3"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + }, + Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ + { + IP: "10.20.30.41", + Port: 80, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "100.101.102.103", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "100.101.102.104", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "100.101.102.105", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "2001:db8::1:1", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "2001:db8::1:2", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "2001:db8::1:3", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + }, + }, + expectedIPSets: netlinktest.ExpectedIPSet{ + kubeNodePortSetSCTP: { + { + IP: "100.101.102.103", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + { + IP: "100.101.102.104", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + { + IP: "100.101.102.105", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + { + IP: "2001:db8::1:1", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + { + IP: "2001:db8::1:2", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + { + IP: "2001:db8::1:3", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + }, + }, + }, } for _, test := range tests { @@ -2897,10 +3128,14 @@ func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) { t.Errorf("Check ipset entries failed for ipset: %q, expect %d, got %d", set, len(entries), len(ents)) continue } - if len(entries) == 1 { - if ents[0] != entries[0].String() { - t.Errorf("Check ipset entries failed for ipset: %q", set) - } + expectedEntries := []string{} + for _, entry := range entries { + expectedEntries = append(expectedEntries, entry.String()) + } + sort.Strings(ents) + sort.Strings(expectedEntries) + if !reflect.DeepEqual(ents, expectedEntries) { + t.Errorf("Check ipset entries failed for ipset: %q", set) } } }