From 138b8b8aaa5643fdac4ebc6afb84e22c71eb7606 Mon Sep 17 00:00:00 2001 From: Subrata Paul Date: Thu, 15 Aug 2019 21:55:23 +0530 Subject: [PATCH] Fix in kube-proxy for sctp ipset entries Kube-proxy will add ipset entries for all node ips for an SCTP nodeport service. This will solve the problem 'SCTP nodeport service is not working for all IPs present in the node when ipvs is enabled. It is working only for node's InternalIP.' --- pkg/proxy/ipvs/proxier.go | 54 +++++--- pkg/proxy/ipvs/proxier_test.go | 243 ++++++++++++++++++++++++++++++++- 2 files changed, 277 insertions(+), 20 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 6652000d80f..07378245786 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1202,42 +1202,57 @@ func (proxier *Proxier) syncProxyRules() { // Nodeports need SNAT, unless they're local. // ipset call - var nodePortSet *IPSet + var ( + nodePortSet *IPSet + entries []*utilipset.Entry + ) + switch protocol { case "tcp": nodePortSet = proxier.ipsetList[kubeNodePortSetTCP] - entry = &utilipset.Entry{ + entries = []*utilipset.Entry{{ // No need to provide ip info Port: svcInfo.NodePort(), Protocol: protocol, SetType: utilipset.BitmapPort, - } + }} case "udp": nodePortSet = proxier.ipsetList[kubeNodePortSetUDP] - entry = &utilipset.Entry{ + entries = []*utilipset.Entry{{ // No need to provide ip info Port: svcInfo.NodePort(), Protocol: protocol, SetType: utilipset.BitmapPort, - } + }} case "sctp": nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP] - entry = &utilipset.Entry{ - IP: proxier.nodeIP.String(), - Port: svcInfo.NodePort(), - Protocol: protocol, - SetType: utilipset.HashIPPort, + // Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries. + entries = []*utilipset.Entry{} + for _, nodeIP := range nodeIPs { + entries = append(entries, &utilipset.Entry{ + IP: nodeIP.String(), + Port: svcInfo.NodePort(), + Protocol: protocol, + SetType: utilipset.HashIPPort, + }) } default: // It should never hit klog.Errorf("Unsupported protocol type: %s", protocol) } if nodePortSet != nil { - if valid := nodePortSet.validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name)) + entryInvalidErr := false + for _, entry := range entries { + if valid := nodePortSet.validateEntry(entry); !valid { + klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name)) + entryInvalidErr = true + break + } + nodePortSet.activeEntries.Insert(entry.String()) + } + if entryInvalidErr { continue } - nodePortSet.activeEntries.Insert(entry.String()) } // Add externaltrafficpolicy=local type nodeport entry @@ -1255,11 +1270,18 @@ func (proxier *Proxier) syncProxyRules() { klog.Errorf("Unsupported protocol type: %s", protocol) } if nodePortLocalSet != nil { - if valid := nodePortLocalSet.validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name)) + entryInvalidErr := false + for _, entry := range entries { + if valid := nodePortLocalSet.validateEntry(entry); !valid { + klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name)) + entryInvalidErr = true + break + } + nodePortLocalSet.activeEntries.Insert(entry.String()) + } + if entryInvalidErr { continue } - nodePortLocalSet.activeEntries.Insert(entry.String()) } } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 5a69e105c85..a4ba2b2c600 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "reflect" + "sort" "strings" "testing" @@ -695,6 +696,236 @@ func TestNodePort(t *testing.T) { }, }, }, + { + name: "node port service with protocol sctp on a node with multiple nodeIPs", + services: []*v1.Service{ + makeTestService("ns1", "svc1", func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = "10.20.30.41" + svc.Spec.Ports = []v1.ServicePort{{ + Name: "p80", + Port: int32(80), + Protocol: v1.ProtocolSCTP, + NodePort: int32(3001), + }} + }), + }, + endpoints: []*v1.Endpoints{ + makeTestEndpoints("ns1", "svc1", func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: "10.180.0.1", + }}, + Ports: []v1.EndpointPort{{ + Name: "p80", + Port: int32(80), + }}, + }} + }), + }, + nodeIPs: []net.IP{ + net.ParseIP("100.101.102.103"), + net.ParseIP("100.101.102.104"), + net.ParseIP("100.101.102.105"), + net.ParseIP("2001:db8::1:1"), + net.ParseIP("2001:db8::1:2"), + net.ParseIP("2001:db8::1:3"), + }, + nodePortAddresses: []string{}, + expectedIPVS: &ipvstest.FakeIPVS{ + Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ + { + IP: "10.20.30.41", + Port: 80, + Protocol: "SCTP", + }: { + Address: net.ParseIP("10.20.30.41"), + Protocol: "SCTP", + Port: uint16(80), + Scheduler: "rr", + }, + { + IP: "100.101.102.103", + Port: 3001, + Protocol: "SCTP", + }: { + Address: net.ParseIP("100.101.102.103"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + { + IP: "100.101.102.104", + Port: 3001, + Protocol: "SCTP", + }: { + Address: net.ParseIP("100.101.102.104"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + { + IP: "100.101.102.105", + Port: 3001, + Protocol: "SCTP", + }: { + Address: net.ParseIP("100.101.102.105"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + { + IP: "2001:db8::1:1", + Port: 3001, + Protocol: "SCTP", + }: { + Address: net.ParseIP("2001:db8::1:1"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + { + IP: "2001:db8::1:2", + Port: 3001, + Protocol: "SCTP", + }: { + Address: net.ParseIP("2001:db8::1:2"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + { + IP: "2001:db8::1:3", + Port: 3001, + Protocol: "SCTP", + }: { + Address: net.ParseIP("2001:db8::1:3"), + Protocol: "SCTP", + Port: uint16(3001), + Scheduler: "rr", + }, + }, + Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{ + { + IP: "10.20.30.41", + Port: 80, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "100.101.102.103", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "100.101.102.104", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "100.101.102.105", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "2001:db8::1:1", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "2001:db8::1:2", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + { + IP: "2001:db8::1:3", + Port: 3001, + Protocol: "SCTP", + }: { + { + Address: net.ParseIP("10.180.0.1"), + Port: uint16(80), + Weight: 1, + }, + }, + }, + }, + expectedIPSets: netlinktest.ExpectedIPSet{ + kubeNodePortSetSCTP: { + { + IP: "100.101.102.103", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + { + IP: "100.101.102.104", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + { + IP: "100.101.102.105", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + { + IP: "2001:db8::1:1", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + { + IP: "2001:db8::1:2", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + { + IP: "2001:db8::1:3", + Port: 3001, + Protocol: strings.ToLower(string(v1.ProtocolSCTP)), + SetType: utilipset.HashIPPort, + }, + }, + }, + }, } for _, test := range tests { @@ -2897,10 +3128,14 @@ func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) { t.Errorf("Check ipset entries failed for ipset: %q, expect %d, got %d", set, len(entries), len(ents)) continue } - if len(entries) == 1 { - if ents[0] != entries[0].String() { - t.Errorf("Check ipset entries failed for ipset: %q", set) - } + expectedEntries := []string{} + for _, entry := range entries { + expectedEntries = append(expectedEntries, entry.String()) + } + sort.Strings(ents) + sort.Strings(expectedEntries) + if !reflect.DeepEqual(ents, expectedEntries) { + t.Errorf("Check ipset entries failed for ipset: %q", set) } } }