fix review

Signed-off-by: Patrik Cyvoct <patrik@ptrk.io>
This commit is contained in:
Patrik Cyvoct
2020-10-22 10:44:04 +02:00
parent d562b6924a
commit 0153b96ab8
11 changed files with 242 additions and 115 deletions

View File

@@ -1175,7 +1175,7 @@ func (proxier *Proxier) syncProxyRules() {
// This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
if ingress.RouteType != v1.LoadBalancerRouteTypeProxy {
if !utilfeature.DefaultFeatureGate.Enabled(features.LoadBalancerIPMode) || ingress.IPMode == nil || *ingress.IPMode == v1.LoadBalancerIPModeVIP {
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),

View File

@@ -26,6 +26,7 @@ import (
"testing"
"time"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"github.com/stretchr/testify/assert"
@@ -34,6 +35,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
@@ -46,6 +48,8 @@ import (
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
utilpointer "k8s.io/utils/pointer"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
@@ -2683,59 +2687,89 @@ COMMIT
}
func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt, false)
svcIP := "10.20.30.41"
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, true)()
ipModeProxy := v1.LoadBalancerIPModeProxy
ipModeVIP := v1.LoadBalancerIPModeVIP
testCases := []struct {
svcIP string
svcLBIP string
ipMode *v1.LoadBalancerIPMode
expectedRule bool
}{
{
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedRule: false,
},
{
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedRule: true,
},
{
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedRule: true,
},
}
svcPort := 80
svcNodePort := 3001
svcLBIP := "1.2.3.4"
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: svcLBIP,
RouteType: v1.LoadBalancerRouteTypeProxy,
}}
}),
)
epIP := "10.180.0.1"
makeEndpointsMap(fp,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIP,
}},
Ports: []v1.EndpointPort{{
for _, testCase := range testCases {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt, false)
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = testCase.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
}},
}}
}),
)
NodePort: int32(svcNodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: testCase.svcLBIP,
IPMode: testCase.ipMode,
}}
}),
)
fp.syncProxyRules()
epIP := "10.180.0.1"
makeEndpointsMap(fp,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIP,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
}},
}}
}),
)
proto := strings.ToLower(string(v1.ProtocolTCP))
fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
fp.syncProxyRules()
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
errorf(fmt.Sprintf("Found jump to firewall chain %v", fwChain), kubeSvcRules, t)
proto := strings.ToLower(string(v1.ProtocolTCP))
fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if hasJump(kubeSvcRules, fwChain, testCase.svcLBIP, svcPort) != testCase.expectedRule {
errorf(fmt.Sprintf("Found jump to firewall chain %v", fwChain), kubeSvcRules, t)
}
}
}

View File

@@ -1333,7 +1333,7 @@ func (proxier *Proxier) syncProxyRules() {
// Capture load-balancer ingress.
for _, ingress := range svcInfo.LoadBalancerIngress() {
if ingress.IP != "" && ingress.RouteType != v1.LoadBalancerRouteTypeProxy {
if ingress.IP != "" && (!utilfeature.DefaultFeatureGate.Enabled(features.LoadBalancerIPMode) || ingress.IPMode == nil || *ingress.IPMode == v1.LoadBalancerIPModeVIP) {
// ipset call
entry = &utilipset.Entry{
IP: ingress.IP,

View File

@@ -33,6 +33,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
@@ -51,6 +53,8 @@ import (
utilpointer "k8s.io/utils/pointer"
utilnet "k8s.io/utils/net"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
const testHostname = "test-hostname"
@@ -4322,55 +4326,85 @@ func TestFilterCIDRs(t *testing.T) {
}
func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) {
_, fp := buildFakeProxier()
svcIP := "10.20.30.41"
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, true)()
ipModeProxy := v1.LoadBalancerIPModeProxy
ipModeVIP := v1.LoadBalancerIPModeVIP
testCases := []struct {
svcIP string
svcLBIP string
ipMode *v1.LoadBalancerIPMode
expectedServices int
}{
{
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedServices: 1,
},
{
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedServices: 2,
},
{
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedServices: 2,
},
}
svcPort := 80
svcNodePort := 3001
svcLBIP := "1.2.3.4"
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
}
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: svcLBIP,
RouteType: v1.LoadBalancerRouteTypeProxy,
}}
}),
)
for _, testCase := range testCases {
_, fp := buildFakeProxier()
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = testCase.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: testCase.svcLBIP,
IPMode: testCase.ipMode,
}}
}),
)
epIP := "10.180.0.1"
makeEndpointsMap(fp,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIP,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
}},
}}
}),
)
epIP := "10.180.0.1"
makeEndpointsMap(fp,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIP,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
}},
}}
}),
)
fp.syncProxyRules()
fp.syncProxyRules()
services, err := fp.ipvs.GetVirtualServers()
if err != nil {
t.Errorf("Failed to get ipvs services, err: %v", err)
}
if len(services) != 1 {
t.Errorf("Expect 1 ipvs services, got %d", len(services))
services, err := fp.ipvs.GetVirtualServers()
if err != nil {
t.Errorf("Failed to get ipvs services, err: %v", err)
}
if len(services) != testCase.expectedServices {
t.Errorf("Expect %d ipvs services, got %d", testCase.expectedServices, len(services))
}
}
}

View File

@@ -164,22 +164,24 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
}
// Obtain Load Balancer Ingress IPs
var ips []string
var allIncorrectIPs []string
for _, ing := range service.Status.LoadBalancer.Ingress {
ips = append(ips, ing.IP)
correctIPs, incorrectIPs := utilproxy.FilterIncorrectIPVersion([]string{ing.IP}, sct.ipFamily)
// len is either 1 or 0
if len(correctIPs) == 1 {
// Update the LoadBalancerStatus with the filtered IPs
info.loadBalancerStatus.Ingress = append(info.loadBalancerStatus.Ingress, ing)
continue
}
// here len(incorrectIPs) == 1
allIncorrectIPs = append(allIncorrectIPs, incorrectIPs[0])
}
if len(ips) > 0 {
correctIPs, incorrectIPs := utilproxy.FilterIncorrectIPVersion(ips, sct.ipFamily)
if len(incorrectIPs) > 0 {
klog.V(4).Infof("service change tracker(%v) ignored the following load balancer(%s) ingress ips for service %v/%v as they don't match IPFamily", sct.ipFamily, strings.Join(incorrectIPs, ","), service.Namespace, service.Name)
if len(incorrectIPs) > 0 {
klog.V(4).Infof("service change tracker(%v) ignored the following load balancer(%s) ingress ips for service %v/%v as they don't match IPFamily", sct.ipFamily, strings.Join(incorrectIPs, ","), service.Namespace, service.Name)
}
// Create the LoadBalancerStatus with the filtered IPs
for _, ip := range correctIPs {
info.loadBalancerStatus.Ingress = append(info.loadBalancerStatus.Ingress, v1.LoadBalancerIngress{IP: ip})
}
}
if apiservice.NeedsHealthCheck(service) {