Merge pull request #81477 from paulsubrata55/kube-proxy-sctp-ipset-fix

Fix in kube-proxy for sctp ipset entries
This commit is contained in:
Kubernetes Prow Robot 2019-08-28 18:26:09 -07:00 committed by GitHub
commit bd8a8db515
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 277 additions and 20 deletions

View File

@ -1204,42 +1204,57 @@ func (proxier *Proxier) syncProxyRules() {
// Nodeports need SNAT, unless they're local.
// ipset call
var nodePortSet *IPSet
var (
nodePortSet *IPSet
entries []*utilipset.Entry
)
switch protocol {
case "tcp":
nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
entry = &utilipset.Entry{
entries = []*utilipset.Entry{{
// No need to provide ip info
Port: svcInfo.NodePort(),
Protocol: protocol,
SetType: utilipset.BitmapPort,
}
}}
case "udp":
nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
entry = &utilipset.Entry{
entries = []*utilipset.Entry{{
// No need to provide ip info
Port: svcInfo.NodePort(),
Protocol: protocol,
SetType: utilipset.BitmapPort,
}
}}
case "sctp":
nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
entry = &utilipset.Entry{
IP: proxier.nodeIP.String(),
Port: svcInfo.NodePort(),
Protocol: protocol,
SetType: utilipset.HashIPPort,
// Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
entries = []*utilipset.Entry{}
for _, nodeIP := range nodeIPs {
entries = append(entries, &utilipset.Entry{
IP: nodeIP.String(),
Port: svcInfo.NodePort(),
Protocol: protocol,
SetType: utilipset.HashIPPort,
})
}
default:
// It should never hit
klog.Errorf("Unsupported protocol type: %s", protocol)
}
if nodePortSet != nil {
if valid := nodePortSet.validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
entryInvalidErr := false
for _, entry := range entries {
if valid := nodePortSet.validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
entryInvalidErr = true
break
}
nodePortSet.activeEntries.Insert(entry.String())
}
if entryInvalidErr {
continue
}
nodePortSet.activeEntries.Insert(entry.String())
}
// Add externaltrafficpolicy=local type nodeport entry
@ -1257,11 +1272,18 @@ func (proxier *Proxier) syncProxyRules() {
klog.Errorf("Unsupported protocol type: %s", protocol)
}
if nodePortLocalSet != nil {
if valid := nodePortLocalSet.validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
entryInvalidErr := false
for _, entry := range entries {
if valid := nodePortLocalSet.validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
entryInvalidErr = true
break
}
nodePortLocalSet.activeEntries.Insert(entry.String())
}
if entryInvalidErr {
continue
}
nodePortLocalSet.activeEntries.Insert(entry.String())
}
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net"
"reflect"
"sort"
"strings"
"testing"
@ -695,6 +696,236 @@ func TestNodePort(t *testing.T) {
},
},
},
{
name: "node port service with protocol sctp on a node with multiple nodeIPs",
services: []*v1.Service{
makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = "10.20.30.41"
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: int32(80),
Protocol: v1.ProtocolSCTP,
NodePort: int32(3001),
}}
}),
},
endpoints: []*v1.Endpoints{
makeTestEndpoints("ns1", "svc1", func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: "10.180.0.1",
}},
Ports: []v1.EndpointPort{{
Name: "p80",
Port: int32(80),
}},
}}
}),
},
nodeIPs: []net.IP{
net.ParseIP("100.101.102.103"),
net.ParseIP("100.101.102.104"),
net.ParseIP("100.101.102.105"),
net.ParseIP("2001:db8::1:1"),
net.ParseIP("2001:db8::1:2"),
net.ParseIP("2001:db8::1:3"),
},
nodePortAddresses: []string{},
expectedIPVS: &ipvstest.FakeIPVS{
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
{
IP: "10.20.30.41",
Port: 80,
Protocol: "SCTP",
}: {
Address: net.ParseIP("10.20.30.41"),
Protocol: "SCTP",
Port: uint16(80),
Scheduler: "rr",
},
{
IP: "100.101.102.103",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("100.101.102.103"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
{
IP: "100.101.102.104",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("100.101.102.104"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
{
IP: "100.101.102.105",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("100.101.102.105"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
{
IP: "2001:db8::1:1",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("2001:db8::1:1"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
{
IP: "2001:db8::1:2",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("2001:db8::1:2"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
{
IP: "2001:db8::1:3",
Port: 3001,
Protocol: "SCTP",
}: {
Address: net.ParseIP("2001:db8::1:3"),
Protocol: "SCTP",
Port: uint16(3001),
Scheduler: "rr",
},
},
Destinations: map[ipvstest.ServiceKey][]*utilipvs.RealServer{
{
IP: "10.20.30.41",
Port: 80,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "100.101.102.103",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "100.101.102.104",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "100.101.102.105",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "2001:db8::1:1",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "2001:db8::1:2",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
{
IP: "2001:db8::1:3",
Port: 3001,
Protocol: "SCTP",
}: {
{
Address: net.ParseIP("10.180.0.1"),
Port: uint16(80),
Weight: 1,
},
},
},
},
expectedIPSets: netlinktest.ExpectedIPSet{
kubeNodePortSetSCTP: {
{
IP: "100.101.102.103",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
{
IP: "100.101.102.104",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
{
IP: "100.101.102.105",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
{
IP: "2001:db8::1:1",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
{
IP: "2001:db8::1:2",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
{
IP: "2001:db8::1:3",
Port: 3001,
Protocol: strings.ToLower(string(v1.ProtocolSCTP)),
SetType: utilipset.HashIPPort,
},
},
},
},
}
for _, test := range tests {
@ -2897,10 +3128,14 @@ func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) {
t.Errorf("Check ipset entries failed for ipset: %q, expect %d, got %d", set, len(entries), len(ents))
continue
}
if len(entries) == 1 {
if ents[0] != entries[0].String() {
t.Errorf("Check ipset entries failed for ipset: %q", set)
}
expectedEntries := []string{}
for _, entry := range entries {
expectedEntries = append(expectedEntries, entry.String())
}
sort.Strings(ents)
sort.Strings(expectedEntries)
if !reflect.DeepEqual(ents, expectedEntries) {
t.Errorf("Check ipset entries failed for ipset: %q", set)
}
}
}