Fix in kube-proxy for sctp ipset entries

Kube-proxy will add ipset entries for all node ips for an SCTP nodeport service. This will solve the problem 'SCTP nodeport service is not working for all IPs present in the node when ipvs is enabled. It is working only for node's InternalIP.'
This commit is contained in:
Subrata Paul 2019-08-15 21:55:23 +05:30 committed by Subrata Paul
parent 0d579bfecf
commit 138b8b8aaa
2 changed files with 277 additions and 20 deletions

View File

@ -1202,43 +1202,58 @@ func (proxier *Proxier) syncProxyRules() {
// Nodeports need SNAT, unless they're local. // Nodeports need SNAT, unless they're local.
// ipset call // ipset call
var nodePortSet *IPSet var (
nodePortSet *IPSet
entries []*utilipset.Entry
)
switch protocol { switch protocol {
case "tcp": case "tcp":
nodePortSet = proxier.ipsetList[kubeNodePortSetTCP] nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
entry = &utilipset.Entry{ entries = []*utilipset.Entry{{
// No need to provide ip info // No need to provide ip info
Port: svcInfo.NodePort(), Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.BitmapPort, SetType: utilipset.BitmapPort,
} }}
case "udp": case "udp":
nodePortSet = proxier.ipsetList[kubeNodePortSetUDP] nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
entry = &utilipset.Entry{ entries = []*utilipset.Entry{{
// No need to provide ip info // No need to provide ip info
Port: svcInfo.NodePort(), Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.BitmapPort, SetType: utilipset.BitmapPort,
} }}
case "sctp": case "sctp":
nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP] nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
entry = &utilipset.Entry{ // Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
IP: proxier.nodeIP.String(), entries = []*utilipset.Entry{}
for _, nodeIP := range nodeIPs {
entries = append(entries, &utilipset.Entry{
IP: nodeIP.String(),
Port: svcInfo.NodePort(), Port: svcInfo.NodePort(),
Protocol: protocol, Protocol: protocol,
SetType: utilipset.HashIPPort, SetType: utilipset.HashIPPort,
})
} }
default: default:
// It should never hit // It should never hit
klog.Errorf("Unsupported protocol type: %s", protocol) klog.Errorf("Unsupported protocol type: %s", protocol)
} }
if nodePortSet != nil { if nodePortSet != nil {
entryInvalidErr := false
for _, entry := range entries {
if valid := nodePortSet.validateEntry(entry); !valid { if valid := nodePortSet.validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name)) klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
continue entryInvalidErr = true
break
} }
nodePortSet.activeEntries.Insert(entry.String()) nodePortSet.activeEntries.Insert(entry.String())
} }
if entryInvalidErr {
continue
}
}
// Add externaltrafficpolicy=local type nodeport entry // Add externaltrafficpolicy=local type nodeport entry
if svcInfo.OnlyNodeLocalEndpoints() { if svcInfo.OnlyNodeLocalEndpoints() {
@ -1255,12 +1270,19 @@ func (proxier *Proxier) syncProxyRules() {
klog.Errorf("Unsupported protocol type: %s", protocol) klog.Errorf("Unsupported protocol type: %s", protocol)
} }
if nodePortLocalSet != nil { if nodePortLocalSet != nil {
entryInvalidErr := false
for _, entry := range entries {
if valid := nodePortLocalSet.validateEntry(entry); !valid { if valid := nodePortLocalSet.validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name)) klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
continue entryInvalidErr = true
break
} }
nodePortLocalSet.activeEntries.Insert(entry.String()) nodePortLocalSet.activeEntries.Insert(entry.String())
} }
if entryInvalidErr {
continue
}
}
} }
// Build ipvs kernel routes for each node ip address // Build ipvs kernel routes for each node ip address

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"net" "net"
"reflect" "reflect"
"sort"
"strings" "strings"
"testing" "testing"
@ -695,6 +696,236 @@ func TestNodePort(t *testing.T) {
}, },
}, },
}, },
{
name: "node port service with protocol sctp on a node with multiple nodeIPs",
services: []*v1.Service{
makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = "10.20.30.41"
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: int32(80),
Protocol: v1.ProtocolSCTP,
NodePort: int32(3001),
}}
}),
},
endpoints: []*v1.Endpoints{
makeTestEndpoints("ns1", "svc1", func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: "10.180.0.1",
}},
Ports: []v1.EndpointPort{{
Name: "p80",
Port: int32(80),
}},
}}
}),
},
nodeIPs: []net.IP{
net.ParseIP("100.101.102.103"),
net.ParseIP("100.101.102.104"),
net.ParseIP("100.101.102.105"),
net.ParseIP("2001:db8::1:1"),
net.ParseIP("2001:db8::1:2"),
net.ParseIP("2001:db8::1:3"),
},
nodePortAddresses: []string{},
expectedIPVS: &ipvstest.FakeIPVS{
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
{
IP: "10.20.30.41",
Port: 80,
Protocol: "SCTP",
}: {
Address: net.ParseIP("10.20.30.41"),
Protocol: "SCTP",
Port: uint16(80),
Scheduler: "rr",
},
{
IP: "100.101.102.103",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("100.101.102.103"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
{
IP: "100.101.102.104",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("100.101.102.104"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
{
IP: "100.101.102.105",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("100.101.102.105"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
{
IP: "2001:db8::1:1",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("2001:db8::1:1"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
{
IP: "2001:db8::1:2",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("2001:db8::1:2"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
{
IP: "2001:db8::1:3",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("2001:db8::1:3"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
},
Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
{
IP: "10.20.30.41",
Port: 80,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "100.101.102.103",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "100.101.102.104",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "100.101.102.105",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "2001:db8::1:1",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "2001:db8::1:2",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "2001:db8::1:3",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
},
},
expectedIPSets: netlinktest.ExpectedIPSet{
kubeNodePortSetSCTP: {
{
IP: "100.101.102.103",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
{
IP: "100.101.102.104",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
{
IP: "100.101.102.105",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
{
IP: "2001:db8::1:1",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
{
IP: "2001:db8::1:2",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
{
IP: "2001:db8::1:3",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
},
},
},
} }
for _, test := range tests { for _, test := range tests {
@ -2897,10 +3128,14 @@ func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) {
t.Errorf("Check ipset entries failed for ipset: %q, expect %d, got %d", set, len(entries), len(ents)) t.Errorf("Check ipset entries failed for ipset: %q, expect %d, got %d", set, len(entries), len(ents))
continue continue
} }
if len(entries) == 1 { expectedEntries := []string{}
if ents[0] != entries[0].String() { for _, entry := range entries {
t.Errorf("Check ipset entries failed for ipset: %q", set) expectedEntries = append(expectedEntries, entry.String())
} }
sort.Strings(ents)
sort.Strings(expectedEntries)
if !reflect.DeepEqual(ents, expectedEntries) {
t.Errorf("Check ipset entries failed for ipset: %q", set)
} }
} }
} }