This commit is contained in:
Lion-Wei
2018-03-27 19:40:14 +08:00
parent 6762a865db
commit 2acd0abd8c
2 changed files with 216 additions and 107 deletions

View File

@@ -805,16 +805,10 @@ func TestLoadBalancer(t *testing.T) {
fp.syncProxyRules()
}
func strPtr(s string) *string {
return &s
}
func TestOnlyLocalNodePorts(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
nodeIP := net.ParseIP("100.101.102.103")
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP})
ipt, fp := buildFakeProxier([]net.IP{nodeIP})
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@@ -837,17 +831,13 @@ func TestOnlyLocalNodePorts(t *testing.T) {
}),
)
epIP1 := "10.180.0.1"
epIP2 := "10.180.2.1"
epIP := "10.180.0.1"
makeEndpointsMap(fp,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
IP: epIP1,
IP: epIP,
NodeName: nil,
}, {
IP: epIP2,
NodeName: strPtr(testHostname),
}},
Ports: []api.EndpointPort{{
Name: svcPortName.Port,
@@ -868,40 +858,42 @@ func TestOnlyLocalNodePorts(t *testing.T) {
fp.syncProxyRules()
// Expect 3 services and 1 destination
services, err := ipvs.GetVirtualServers()
if err != nil {
t.Errorf("Failed to get ipvs services, err: %v", err)
epVS := &netlinktest.ExpectedVirtualServer{
VSNum: 3, IP: nodeIP.String(), Port: uint16(svcNodePort), Protocol: string(api.ProtocolTCP),
RS: []netlinktest.ExpectedRealServer{{
IP: epIP, Port: uint16(svcPort),
}}}
checkIPVS(t, fp, epVS)
// check ipSet rules
epEntry := &utilipset.Entry{
Port: svcNodePort,
Protocol: strings.ToLower(string(api.ProtocolTCP)),
SetType: utilipset.BitmapPort,
}
if len(services) != 3 {
t.Errorf("Expect 3 ipvs services, got %d", len(services))
epIPSet := netlinktest.ExpectedIPSet{
KubeNodePortSetTCP: {epEntry},
KubeNodePortLocalSetTCP: {epEntry},
}
found := false
for _, svc := range services {
if svc.Address.Equal(nodeIP) && svc.Port == uint16(svcNodePort) && svc.Protocol == string(api.ProtocolTCP) {
found = true
destinations, err := ipvs.GetRealServers(svc)
if err != nil {
t.Errorf("Failed to get ipvs destinations, err: %v", err)
}
if len(destinations) != 1 {
t.Errorf("Expect 1 ipvs destination, got %d", len(destinations))
} else {
if destinations[0].Address.String() != epIP2 || destinations[0].Port != uint16(svcPort) {
t.Errorf("service Endpoint mismatch ipvs service destination")
}
}
break
}
}
if !found {
t.Errorf("Expect node port type service, got none")
checkIPSet(t, fp, epIPSet)
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeNodePortChain), MatchSet: KubeNodePortSetTCP,
}},
string(KubeNodePortChain): {{
JumpChain: "ACCEPT", MatchSet: KubeNodePortLocalSetTCP,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: "",
}},
}
checkIptables(t, ipt, epIpt)
}
func TestLoadBalanceSourceRanges(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
nodeIP := net.ParseIP("100.101.102.103")
ipt, fp := buildFakeProxier([]net.IP{nodeIP})
svcIP := "10.20.30.41"
svcPort := 80
svcLBIP := "1.2.3.4"
@@ -934,7 +926,7 @@ func TestLoadBalanceSourceRanges(t *testing.T) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
IP: epIP,
NodeName: strPtr(testHostname),
NodeName: nil,
}},
Ports: []api.EndpointPort{{
Name: svcPortName.Port,
@@ -947,73 +939,48 @@ func TestLoadBalanceSourceRanges(t *testing.T) {
fp.syncProxyRules()
// Check ipvs service and destinations
services, err := ipvs.GetVirtualServers()
if err != nil {
t.Errorf("Failed to get ipvs services, err: %v", err)
}
found := false
for _, svc := range services {
fmt.Printf("address: %s:%d, %s", svc.Address.String(), svc.Port, svc.Protocol)
if svc.Address.Equal(net.ParseIP(svcLBIP)) && svc.Port == uint16(svcPort) && svc.Protocol == string(api.ProtocolTCP) {
destinations, _ := ipvs.GetRealServers(svc)
if len(destinations) != 1 {
t.Errorf("Unexpected %d destinations, expect 0 destinations", len(destinations))
}
for _, ep := range destinations {
if ep.Address.String() == epIP && ep.Port == uint16(svcPort) {
found = true
}
}
}
}
if !found {
t.Errorf("Did not got expected loadbalance service")
}
epVS := &netlinktest.ExpectedVirtualServer{
VSNum: 2, IP: svcLBIP, Port: uint16(svcPort), Protocol: string(api.ProtocolTCP),
RS: []netlinktest.ExpectedRealServer{{
IP: epIP, Port: uint16(svcPort),
}}}
checkIPVS(t, fp, epVS)
// Check ipset entry
expectIPSet := map[string]*utilipset.Entry{
KubeLoadBalancerSet: {
epIPSet := netlinktest.ExpectedIPSet{
KubeLoadBalancerSet: {{
IP: svcLBIP,
Port: svcPort,
Protocol: strings.ToLower(string(api.ProtocolTCP)),
SetType: utilipset.HashIPPort,
},
KubeLoadBalancerSourceCIDRSet: {
}},
KubeLoadBalancerSourceCIDRSet: {{
IP: svcLBIP,
Port: svcPort,
Protocol: strings.ToLower(string(api.ProtocolTCP)),
Net: svcLBSource,
SetType: utilipset.HashIPPortNet,
},
}
for set, entry := range expectIPSet {
ents, err := ipset.ListEntries(set)
if err != nil || len(ents) != 1 {
t.Errorf("Check ipset entries failed for ipset: %q", set)
continue
}
if ents[0] != entry.String() {
t.Errorf("Check ipset entries failed for ipset: %q", set)
}
}},
}
checkIPSet(t, fp, epIPSet)
// Check iptables chain and rules
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
kubeFWRules := ipt.GetRules(string(KubeFireWallChain))
if !hasJump(kubeSvcRules, string(KubeFireWallChain), KubeLoadBalancerSet) {
t.Errorf("Didn't find jump from chain %v match set %v to %v", kubeServicesChain,
KubeLoadBalancerSet, KubeFireWallChain)
}
if !hasJump(kubeFWRules, "ACCEPT", KubeLoadBalancerSourceCIDRSet) {
t.Errorf("Didn't find jump from chain %v match set %v to ACCEPT", kubeServicesChain, KubeLoadBalancerSourceCIDRSet)
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeFireWallChain), MatchSet: KubeLoadBalancerSet,
}},
string(KubeFireWallChain): {{
JumpChain: "ACCEPT", MatchSet: KubeLoadBalancerSourceCIDRSet,
}, {
JumpChain: string(KubeMarkDropChain), MatchSet: "",
}},
}
checkIptables(t, ipt, epIpt)
}
func TestOnlyLocalLoadBalancing(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
ipt, fp := buildFakeProxier(nil)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@@ -1040,17 +1007,13 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
}),
)
epIP1 := "10.180.0.1"
epIP2 := "10.180.2.1"
epIP := "10.180.0.1"
makeEndpointsMap(fp,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
IP: epIP1,
IP: epIP,
NodeName: nil,
}, {
IP: epIP2,
NodeName: strPtr(testHostname),
}},
Ports: []api.EndpointPort{{
Name: svcPortName.Port,
@@ -1061,6 +1024,44 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
)
fp.syncProxyRules()
// Expect 2 services and 1 destination
epVS := &netlinktest.ExpectedVirtualServer{
VSNum: 2, IP: svcLBIP, Port: uint16(svcPort), Protocol: string(api.ProtocolTCP),
RS: []netlinktest.ExpectedRealServer{{
IP: epIP, Port: uint16(svcPort),
}}}
checkIPVS(t, fp, epVS)
// check ipSet rules
epIPSet := netlinktest.ExpectedIPSet{
KubeLoadBalancerSet: {{
IP: svcLBIP,
Port: svcPort,
Protocol: strings.ToLower(string(api.ProtocolTCP)),
SetType: utilipset.HashIPPort,
}},
KubeLoadBalancerIngressLocalSet: {{
IP: svcLBIP,
Port: svcPort,
Protocol: strings.ToLower(string(api.ProtocolTCP)),
SetType: utilipset.HashIPPort,
}},
}
checkIPSet(t, fp, epIPSet)
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeFireWallChain), MatchSet: KubeLoadBalancerSet,
}},
string(KubeFireWallChain): {{
JumpChain: "ACCEPT", MatchSet: KubeLoadBalancerIngressLocalSet,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: "",
}},
}
checkIptables(t, ipt, epIpt)
}
func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, port, nodeport int32, targetPort int) []api.ServicePort {
@@ -2392,18 +2393,75 @@ func Test_syncService(t *testing.T) {
}
}
func buildFakeProxier(nodeIP []net.IP) (*iptablestest.FakeIPTables, *Proxier) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
return ipt, NewFakeProxier(ipt, ipvs, ipset, nil)
}
func hasJump(rules []iptablestest.Rule, destChain, ipSet string) bool {
match := false
for _, r := range rules {
if r[iptablestest.Jump] == destChain {
match = true
if ipSet != "" {
if strings.Contains(r[iptablestest.MatchSet], ipSet) {
return true
}
match = false
if ipSet == "" {
return true
}
if strings.Contains(r[iptablestest.MatchSet], ipSet) {
return true
}
}
}
return false
}
// checkIptabless to check expected iptables chain and rules
func checkIptables(t *testing.T, ipt *iptablestest.FakeIPTables, epIpt netlinktest.ExpectedIptablesChain) {
for epChain, epRules := range epIpt {
rules := ipt.GetRules(epChain)
for _, epRule := range epRules {
if !hasJump(rules, epRule.JumpChain, epRule.MatchSet) {
t.Errorf("Didn't find jump from chain %v match set %v to %v", epChain, epRule.MatchSet, epRule.JumpChain)
}
}
}
}
// checkIPSet to check expected ipset and entries
func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) {
for set, entries := range ipSet {
ents, err := fp.ipset.ListEntries(set)
if err != nil || len(ents) != len(entries) {
t.Errorf("Check ipset entries failed for ipset: %q", set)
continue
}
if len(entries) == 1 {
if ents[0] != entries[0].String() {
t.Errorf("Check ipset entries failed for ipset: %q", set)
}
}
}
}
// checkIPVS to check expected ipvs service and destination
func checkIPVS(t *testing.T, fp *Proxier, vs *netlinktest.ExpectedVirtualServer) {
services, err := fp.ipvs.GetVirtualServers()
if err != nil {
t.Errorf("Failed to get ipvs services, err: %v", err)
}
if len(services) != vs.VSNum {
t.Errorf("Expect %d ipvs services, got %d", vs.VSNum, len(services))
}
for _, svc := range services {
if svc.Address.String() == vs.IP && svc.Port == vs.Port && svc.Protocol == vs.Protocol {
destinations, _ := fp.ipvs.GetRealServers(svc)
if len(destinations) != len(vs.RS) {
t.Errorf("Expected %d destinations, got %d destinations", len(vs.RS), len(destinations))
}
if len(vs.RS) == 1 {
if destinations[0].Address.String() != vs.RS[0].IP || destinations[0].Port != vs.RS[0].Port {
t.Errorf("Unexpected mismatch destinations")
}
}
}
}
return match
}