Proxy changes for IP mode field

This commit is contained in:
Aohan Yang 2023-07-06 16:48:52 +08:00
parent 7c6e399b22
commit 7eab0d7a0d
9 changed files with 314 additions and 62 deletions

View File

@ -50,7 +50,7 @@ func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, sv
for _, extIP := range svcInfo.ExternalIPStrings() { for _, extIP := range svcInfo.ExternalIPStrings() {
conntrackCleanupServiceIPs.Insert(extIP) conntrackCleanupServiceIPs.Insert(extIP)
} }
for _, lbIP := range svcInfo.LoadBalancerIPStrings() { for _, lbIP := range svcInfo.LoadBalancerVIPStrings() {
conntrackCleanupServiceIPs.Insert(lbIP) conntrackCleanupServiceIPs.Insert(lbIP)
} }
nodePort := svcInfo.NodePort() nodePort := svcInfo.NodePort()
@ -100,7 +100,7 @@ func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap pro
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
} }
} }
for _, lbIP := range svcInfo.LoadBalancerIPStrings() { for _, lbIP := range svcInfo.LoadBalancerVIPStrings() {
err := ClearEntriesForNAT(exec, lbIP, endpointIP, v1.ProtocolUDP) err := ClearEntriesForNAT(exec, lbIP, endpointIP, v1.ProtocolUDP)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)

View File

@ -1032,7 +1032,7 @@ func (proxier *Proxier) syncProxyRules() {
// create a firewall chain. // create a firewall chain.
loadBalancerTrafficChain := externalTrafficChain loadBalancerTrafficChain := externalTrafficChain
fwChain := svcInfo.firewallChainName fwChain := svcInfo.firewallChainName
usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerIPStrings()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0 usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerVIPStrings()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
if usesFWChain { if usesFWChain {
activeNATChains[fwChain] = true activeNATChains[fwChain] = true
loadBalancerTrafficChain = fwChain loadBalancerTrafficChain = fwChain
@ -1124,7 +1124,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Capture load-balancer ingress. // Capture load-balancer ingress.
for _, lbip := range svcInfo.LoadBalancerIPStrings() { for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
if hasEndpoints { if hasEndpoints {
natRules.Write( natRules.Write(
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
@ -1149,7 +1149,7 @@ func (proxier *Proxier) syncProxyRules() {
// Either no endpoints at all (REJECT) or no endpoints for // Either no endpoints at all (REJECT) or no endpoints for
// external traffic (DROP anything that didn't get short-circuited // external traffic (DROP anything that didn't get short-circuited
// by the EXT chain.) // by the EXT chain.)
for _, lbip := range svcInfo.LoadBalancerIPStrings() { for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
filterRules.Write( filterRules.Write(
"-A", string(kubeExternalServicesChain), "-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", externalTrafficFilterComment, "-m", "comment", "--comment", externalTrafficFilterComment,
@ -1327,7 +1327,7 @@ func (proxier *Proxier) syncProxyRules() {
// will loop back with the source IP set to the VIP. We // will loop back with the source IP set to the VIP. We
// need the following rules to allow requests from this node. // need the following rules to allow requests from this node.
if allowFromNode { if allowFromNode {
for _, lbip := range svcInfo.LoadBalancerIPStrings() { for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
natRules.Write( natRules.Write(
args, args,
"-s", lbip, "-s", lbip,

View File

@ -38,8 +38,11 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/conntrack" "k8s.io/kubernetes/pkg/proxy/conntrack"
"k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/metrics"
@ -8276,3 +8279,128 @@ func TestNoEndpointsMetric(t *testing.T) {
}) })
} }
} }
func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) {
ipModeProxy := v1.LoadBalancerIPModeProxy
ipModeVIP := v1.LoadBalancerIPModeVIP
testCases := []struct {
name string
ipModeEnabled bool
svcIP string
svcLBIP string
ipMode *v1.LoadBalancerIPMode
expectedRule bool
}{
/* LoadBalancerIPMode disabled */
{
name: "LoadBalancerIPMode disabled, ipMode Proxy",
ipModeEnabled: false,
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedRule: true,
},
{
name: "LoadBalancerIPMode disabled, ipMode VIP",
ipModeEnabled: false,
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedRule: true,
},
{
name: "LoadBalancerIPMode disabled, ipMode nil",
ipModeEnabled: false,
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedRule: true,
},
/* LoadBalancerIPMode enabled */
{
name: "LoadBalancerIPMode enabled, ipMode Proxy",
ipModeEnabled: true,
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedRule: false,
},
{
name: "LoadBalancerIPMode enabled, ipMode VIP",
ipModeEnabled: true,
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedRule: true,
},
{
name: "LoadBalancerIPMode enabled, ipMode nil",
ipModeEnabled: true,
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedRule: true,
},
}
svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, testCase.ipModeEnabled)()
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = testCase.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: testCase.svcLBIP,
IPMode: testCase.ipMode,
}}
}),
)
tcpProtocol := v1.ProtocolTCP
populateEndpointSlices(fp,
makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"10.180.0.1"},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.String("p80"),
Port: pointer.Int32(80),
Protocol: &tcpProtocol,
}}
}),
)
fp.syncProxyRules()
c, _ := ipt.Dump.GetChain(utiliptables.TableNAT, kubeServicesChain)
ruleExists := false
for _, r := range c.Rules {
if r.DestinationAddress != nil && r.DestinationAddress.Value == testCase.svcLBIP {
ruleExists = true
}
}
if ruleExists != testCase.expectedRule {
t.Errorf("unexpected rule for %s", testCase.svcLBIP)
}
})
}
}

View File

@ -1172,7 +1172,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Capture load-balancer ingress. // Capture load-balancer ingress.
for _, ingress := range svcInfo.LoadBalancerIPStrings() { for _, ingress := range svcInfo.LoadBalancerVIPStrings() {
// ipset call // ipset call
entry = &utilipset.Entry{ entry = &utilipset.Entry{
IP: ingress, IP: ingress,

View File

@ -36,7 +36,10 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/healthcheck"
utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset" utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset"
@ -5836,3 +5839,123 @@ func TestDismissLocalhostRuleExist(t *testing.T) {
}) })
} }
} }
func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) {
ipModeProxy := v1.LoadBalancerIPModeProxy
ipModeVIP := v1.LoadBalancerIPModeVIP
testCases := []struct {
name string
ipModeEnabled bool
svcIP string
svcLBIP string
ipMode *v1.LoadBalancerIPMode
expectedServices int
}{
/* LoadBalancerIPMode disabled */
{
name: "LoadBalancerIPMode disabled, ipMode Proxy",
ipModeEnabled: false,
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedServices: 2,
},
{
name: "LoadBalancerIPMode disabled, ipMode VIP",
ipModeEnabled: false,
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedServices: 2,
},
{
name: "LoadBalancerIPMode disabled, ipMode nil",
ipModeEnabled: false,
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedServices: 2,
},
/* LoadBalancerIPMode enabled */
{
name: "LoadBalancerIPMode enabled, ipMode Proxy",
ipModeEnabled: true,
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedServices: 1,
},
{
name: "LoadBalancerIPMode enabled, ipMode VIP",
ipModeEnabled: true,
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedServices: 2,
},
{
name: "LoadBalancerIPMode enabled, ipMode nil",
ipModeEnabled: true,
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedServices: 2,
},
}
svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, testCase.ipModeEnabled)()
_, fp := buildFakeProxier()
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = testCase.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: testCase.svcLBIP,
IPMode: testCase.ipMode,
}}
}),
)
tcpProtocol := v1.ProtocolTCP
makeEndpointSliceMap(fp,
makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"10.180.0.1"},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.String("p80"),
Port: pointer.Int32(80),
Protocol: &tcpProtocol,
}}
}),
)
fp.syncProxyRules()
services, err := fp.ipvs.GetVirtualServers()
if err != nil {
t.Errorf("Failed to get ipvs services, err: %v", err)
}
if len(services) != testCase.expectedServices {
t.Errorf("Expected %d ipvs services, got %d", testCase.expectedServices, len(services))
}
})
}
}

View File

@ -44,7 +44,7 @@ type BaseServicePortInfo struct {
port int port int
protocol v1.Protocol protocol v1.Protocol
nodePort int nodePort int
loadBalancerStatus v1.LoadBalancerStatus loadBalancerVIPs []string
sessionAffinityType v1.ServiceAffinity sessionAffinityType v1.ServiceAffinity
stickyMaxAgeSeconds int stickyMaxAgeSeconds int
externalIPs []string externalIPs []string
@ -108,13 +108,9 @@ func (bsvcPortInfo *BaseServicePortInfo) ExternalIPStrings() []string {
return bsvcPortInfo.externalIPs return bsvcPortInfo.externalIPs
} }
// LoadBalancerIPStrings is part of ServicePort interface. // LoadBalancerVIPStrings is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) LoadBalancerIPStrings() []string { func (bsvcPortInfo *BaseServicePortInfo) LoadBalancerVIPStrings() []string {
var ips []string return bsvcPortInfo.loadBalancerVIPs
for _, ing := range bsvcPortInfo.loadBalancerStatus.Ingress {
ips = append(ips, ing.IP)
}
return ips
} }
// ExternalPolicyLocal is part of ServicePort interface. // ExternalPolicyLocal is part of ServicePort interface.
@ -139,7 +135,7 @@ func (bsvcPortInfo *BaseServicePortInfo) HintsAnnotation() string {
// ExternallyAccessible is part of ServicePort interface. // ExternallyAccessible is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) ExternallyAccessible() bool { func (bsvcPortInfo *BaseServicePortInfo) ExternallyAccessible() bool {
return bsvcPortInfo.nodePort != 0 || len(bsvcPortInfo.loadBalancerStatus.Ingress) != 0 || len(bsvcPortInfo.externalIPs) != 0 return bsvcPortInfo.nodePort != 0 || len(bsvcPortInfo.loadBalancerVIPs) != 0 || len(bsvcPortInfo.externalIPs) != 0
} }
// UsesClusterEndpoints is part of ServicePort interface. // UsesClusterEndpoints is part of ServicePort interface.
@ -211,25 +207,21 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
"ipFamily", sct.ipFamily, "loadBalancerSourceRanges", strings.Join(cidrs, ", "), "service", klog.KObj(service)) "ipFamily", sct.ipFamily, "loadBalancerSourceRanges", strings.Join(cidrs, ", "), "service", klog.KObj(service))
} }
// Obtain Load Balancer Ingress IPs // Obtain Load Balancer Ingress
var ips []string var invalidIPs []string
for _, ing := range service.Status.LoadBalancer.Ingress { for _, ing := range service.Status.LoadBalancer.Ingress {
if ing.IP != "" { if ing.IP == "" {
ips = append(ips, ing.IP) continue
}
if ipFamily := proxyutil.GetIPFamilyFromIP(ing.IP); ipFamily == sct.ipFamily && proxyutil.IsVIPMode(ing) {
info.loadBalancerVIPs = append(info.loadBalancerVIPs, ing.IP)
} else {
invalidIPs = append(invalidIPs, ing.IP)
} }
} }
if len(invalidIPs) > 0 {
if len(ips) > 0 { klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family",
ipFamilyMap = proxyutil.MapIPsByIPFamily(ips) "ipFamily", sct.ipFamily, "loadBalancerIngressIPs", strings.Join(invalidIPs, ", "), "service", klog.KObj(service))
if ipList, ok := ipFamilyMap[proxyutil.OtherIPFamily(sct.ipFamily)]; ok && len(ipList) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family",
"ipFamily", sct.ipFamily, "loadBalancerIngressIps", strings.Join(ipList, ", "), "service", klog.KObj(service))
}
// Create the LoadBalancerStatus with the filtered IPs
for _, ip := range ipFamilyMap[sct.ipFamily] {
info.loadBalancerStatus.Ingress = append(info.loadBalancerStatus.Ingress, v1.LoadBalancerIngress{IP: ip})
}
} }
if apiservice.NeedsHealthCheck(service) { if apiservice.NeedsHealthCheck(service) {

View File

@ -181,10 +181,10 @@ func TestServiceToServiceMap(t *testing.T) {
}), }),
expected: map[ServicePortName]*BaseServicePortInfo{ expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}} bsvcPortInfo.loadBalancerVIPs = []string{"10.1.2.4"}
}), }),
makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}} bsvcPortInfo.loadBalancerVIPs = []string{"10.1.2.4"}
}), }),
}, },
}, },
@ -204,10 +204,10 @@ func TestServiceToServiceMap(t *testing.T) {
}), }),
expected: map[ServicePortName]*BaseServicePortInfo{ expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("ns1", "only-local-load-balancer", "portx", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8677, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("ns1", "only-local-load-balancer", "portx", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8677, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}} bsvcPortInfo.loadBalancerVIPs = []string{"10.1.2.3"}
}), }),
makeServicePortName("ns1", "only-local-load-balancer", "porty", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8678, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("ns1", "only-local-load-balancer", "porty", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8678, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}} bsvcPortInfo.loadBalancerVIPs = []string{"10.1.2.3"}
}), }),
}, },
}, },
@ -315,7 +315,7 @@ func TestServiceToServiceMap(t *testing.T) {
makeServicePortName("test", "validIPv4", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("test", "validIPv4", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.externalIPs = []string{testExternalIPv4} bsvcPortInfo.externalIPs = []string{testExternalIPv4}
bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv4} bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv4}
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv4}} bsvcPortInfo.loadBalancerVIPs = []string{testExternalIPv4}
}), }),
}, },
}, },
@ -353,7 +353,7 @@ func TestServiceToServiceMap(t *testing.T) {
makeServicePortName("test", "validIPv6", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("test", "validIPv6", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.externalIPs = []string{testExternalIPv6} bsvcPortInfo.externalIPs = []string{testExternalIPv6}
bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv6} bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv6}
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv6}} bsvcPortInfo.loadBalancerVIPs = []string{testExternalIPv6}
}), }),
}, },
}, },
@ -391,7 +391,7 @@ func TestServiceToServiceMap(t *testing.T) {
makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.externalIPs = []string{testExternalIPv4} bsvcPortInfo.externalIPs = []string{testExternalIPv4}
bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv4} bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv4}
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv4}} bsvcPortInfo.loadBalancerVIPs = []string{testExternalIPv4}
}), }),
}, },
}, },
@ -429,7 +429,7 @@ func TestServiceToServiceMap(t *testing.T) {
makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.externalIPs = []string{testExternalIPv6} bsvcPortInfo.externalIPs = []string{testExternalIPv6}
bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv6} bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv6}
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv6}} bsvcPortInfo.loadBalancerVIPs = []string{testExternalIPv6}
}), }),
}, },
}, },
@ -483,7 +483,7 @@ func TestServiceToServiceMap(t *testing.T) {
svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort || svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort ||
!sets.New[string](svcInfo.externalIPs...).Equal(sets.New[string](expectedInfo.externalIPs...)) || !sets.New[string](svcInfo.externalIPs...).Equal(sets.New[string](expectedInfo.externalIPs...)) ||
!sets.New[string](svcInfo.loadBalancerSourceRanges...).Equal(sets.New[string](expectedInfo.loadBalancerSourceRanges...)) || !sets.New[string](svcInfo.loadBalancerSourceRanges...).Equal(sets.New[string](expectedInfo.loadBalancerSourceRanges...)) ||
!reflect.DeepEqual(svcInfo.loadBalancerStatus, expectedInfo.loadBalancerStatus) { !reflect.DeepEqual(svcInfo.loadBalancerVIPs, expectedInfo.loadBalancerVIPs) {
t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo) t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo)
} }
for svcKey, expectedInfo := range tc.expected { for svcKey, expectedInfo := range tc.expected {
@ -494,7 +494,7 @@ func TestServiceToServiceMap(t *testing.T) {
svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort || svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort ||
!sets.New[string](svcInfo.externalIPs...).Equal(sets.New[string](expectedInfo.externalIPs...)) || !sets.New[string](svcInfo.externalIPs...).Equal(sets.New[string](expectedInfo.externalIPs...)) ||
!sets.New[string](svcInfo.loadBalancerSourceRanges...).Equal(sets.New[string](expectedInfo.loadBalancerSourceRanges...)) || !sets.New[string](svcInfo.loadBalancerSourceRanges...).Equal(sets.New[string](expectedInfo.loadBalancerSourceRanges...)) ||
!reflect.DeepEqual(svcInfo.loadBalancerStatus, expectedInfo.loadBalancerStatus) { !reflect.DeepEqual(svcInfo.loadBalancerVIPs, expectedInfo.loadBalancerVIPs) {
t.Errorf("expected new[%v]to be %v, got %v", svcKey, expectedInfo, *svcInfo) t.Errorf("expected new[%v]to be %v, got %v", svcKey, expectedInfo, *svcInfo)
} }
} }

View File

@ -73,8 +73,8 @@ type ServicePort interface {
StickyMaxAgeSeconds() int StickyMaxAgeSeconds() int
// ExternalIPStrings returns service ExternalIPs as a string array. // ExternalIPStrings returns service ExternalIPs as a string array.
ExternalIPStrings() []string ExternalIPStrings() []string
// LoadBalancerIPStrings returns service LoadBalancerIPs as a string array. // LoadBalancerVIPStrings returns service LoadBalancerIPs which are VIP mode as a string array.
LoadBalancerIPStrings() []string LoadBalancerVIPStrings() []string
// Protocol returns service protocol. // Protocol returns service protocol.
Protocol() v1.Protocol Protocol() v1.Protocol
// LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not // LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not

View File

@ -27,9 +27,11 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilrand "k8s.io/apimachinery/pkg/util/rand" utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
utilsysctl "k8s.io/component-helpers/node/util/sysctl" utilsysctl "k8s.io/component-helpers/node/util/sysctl"
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -182,7 +184,7 @@ func MapIPsByIPFamily(ipStrings []string) map[v1.IPFamily][]string {
ipFamilyMap := map[v1.IPFamily][]string{} ipFamilyMap := map[v1.IPFamily][]string{}
for _, ip := range ipStrings { for _, ip := range ipStrings {
// Handle only the valid IPs // Handle only the valid IPs
if ipFamily := getIPFamilyFromIP(ip); ipFamily != "" { if ipFamily := GetIPFamilyFromIP(ip); ipFamily != "" {
ipFamilyMap[ipFamily] = append(ipFamilyMap[ipFamily], ip) ipFamilyMap[ipFamily] = append(ipFamilyMap[ipFamily], ip)
} else { } else {
// this function is called in multiple places. All of which // this function is called in multiple places. All of which
@ -213,29 +215,26 @@ func MapCIDRsByIPFamily(cidrStrings []string) map[v1.IPFamily][]string {
return ipFamilyMap return ipFamilyMap
} }
// Returns the IP family of ipStr, or "" if ipStr can't be parsed as an IP // GetIPFamilyFromIP Returns the IP family of ipStr, or "" if ipStr can't be parsed as an IP
func getIPFamilyFromIP(ipStr string) v1.IPFamily { func GetIPFamilyFromIP(ipStr string) v1.IPFamily {
netIP := netutils.ParseIPSloppy(ipStr) return convertToV1IPFamily(netutils.IPFamilyOfString(ipStr))
if netIP == nil {
return ""
}
if netutils.IsIPv6(netIP) {
return v1.IPv6Protocol
}
return v1.IPv4Protocol
} }
// Returns the IP family of cidrStr, or "" if cidrStr can't be parsed as a CIDR // Returns the IP family of cidrStr, or "" if cidrStr can't be parsed as a CIDR
func getIPFamilyFromCIDR(cidrStr string) v1.IPFamily { func getIPFamilyFromCIDR(cidrStr string) v1.IPFamily {
_, netCIDR, err := netutils.ParseCIDRSloppy(cidrStr) return convertToV1IPFamily(netutils.IPFamilyOfCIDRString(cidrStr))
if err != nil { }
return ""
} // Convert netutils.IPFamily to v1.IPFamily
if netutils.IsIPv6CIDR(netCIDR) { func convertToV1IPFamily(ipFamily netutils.IPFamily) v1.IPFamily {
switch ipFamily {
case netutils.IPv4:
return v1.IPv4Protocol
case netutils.IPv6:
return v1.IPv6Protocol return v1.IPv6Protocol
} }
return v1.IPv4Protocol
return ""
} }
// OtherIPFamily returns the other ip family // OtherIPFamily returns the other ip family
@ -331,3 +330,13 @@ func RevertPorts(replacementPortsMap, originalPortsMap map[netutils.LocalPort]ne
} }
} }
} }
func IsVIPMode(ing v1.LoadBalancerIngress) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.LoadBalancerIPMode) {
return true // backwards compat
}
if ing.IPMode == nil {
return true
}
return *ing.IPMode == v1.LoadBalancerIPModeVIP
}