Proxy changes for IP mode field

This commit is contained in:
Aohan Yang 2023-07-06 16:48:52 +08:00
parent 29a6705dab
commit 86b1f095ca
10 changed files with 318 additions and 64 deletions

View File

@ -50,7 +50,7 @@ func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, sv
for _, extIP := range svcInfo.ExternalIPStrings() { for _, extIP := range svcInfo.ExternalIPStrings() {
conntrackCleanupServiceIPs.Insert(extIP) conntrackCleanupServiceIPs.Insert(extIP)
} }
for _, lbIP := range svcInfo.LoadBalancerIPStrings() { for _, lbIP := range svcInfo.LoadBalancerVIPStrings() {
conntrackCleanupServiceIPs.Insert(lbIP) conntrackCleanupServiceIPs.Insert(lbIP)
} }
nodePort := svcInfo.NodePort() nodePort := svcInfo.NodePort()
@ -100,7 +100,7 @@ func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap pro
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
} }
} }
for _, lbIP := range svcInfo.LoadBalancerIPStrings() { for _, lbIP := range svcInfo.LoadBalancerVIPStrings() {
err := ClearEntriesForNAT(exec, lbIP, endpointIP, v1.ProtocolUDP) err := ClearEntriesForNAT(exec, lbIP, endpointIP, v1.ProtocolUDP)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)

View File

@ -1024,7 +1024,7 @@ func (proxier *Proxier) syncProxyRules() {
// create a firewall chain. // create a firewall chain.
loadBalancerTrafficChain := externalTrafficChain loadBalancerTrafficChain := externalTrafficChain
fwChain := svcInfo.firewallChainName fwChain := svcInfo.firewallChainName
usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerIPStrings()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0 usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerVIPStrings()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
if usesFWChain { if usesFWChain {
activeNATChains[fwChain] = true activeNATChains[fwChain] = true
loadBalancerTrafficChain = fwChain loadBalancerTrafficChain = fwChain
@ -1116,7 +1116,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Capture load-balancer ingress. // Capture load-balancer ingress.
for _, lbip := range svcInfo.LoadBalancerIPStrings() { for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
if hasEndpoints { if hasEndpoints {
natRules.Write( natRules.Write(
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
@ -1141,7 +1141,7 @@ func (proxier *Proxier) syncProxyRules() {
// Either no endpoints at all (REJECT) or no endpoints for // Either no endpoints at all (REJECT) or no endpoints for
// external traffic (DROP anything that didn't get short-circuited // external traffic (DROP anything that didn't get short-circuited
// by the EXT chain.) // by the EXT chain.)
for _, lbip := range svcInfo.LoadBalancerIPStrings() { for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
filterRules.Write( filterRules.Write(
"-A", string(kubeExternalServicesChain), "-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", externalTrafficFilterComment, "-m", "comment", "--comment", externalTrafficFilterComment,
@ -1319,7 +1319,7 @@ func (proxier *Proxier) syncProxyRules() {
// will loop back with the source IP set to the VIP. We // will loop back with the source IP set to the VIP. We
// need the following rules to allow requests from this node. // need the following rules to allow requests from this node.
if allowFromNode { if allowFromNode {
for _, lbip := range svcInfo.LoadBalancerIPStrings() { for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
natRules.Write( natRules.Write(
args, args,
"-s", lbip, "-s", lbip,

View File

@ -38,8 +38,11 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/conntrack" "k8s.io/kubernetes/pkg/proxy/conntrack"
"k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/metrics"
@ -8276,3 +8279,128 @@ func TestNoEndpointsMetric(t *testing.T) {
}) })
} }
} }
func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) {
ipModeProxy := v1.LoadBalancerIPModeProxy
ipModeVIP := v1.LoadBalancerIPModeVIP
testCases := []struct {
name string
ipModeEnabled bool
svcIP string
svcLBIP string
ipMode *v1.LoadBalancerIPMode
expectedRule bool
}{
/* LoadBalancerIPMode disabled */
{
name: "LoadBalancerIPMode disabled, ipMode Proxy",
ipModeEnabled: false,
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedRule: true,
},
{
name: "LoadBalancerIPMode disabled, ipMode VIP",
ipModeEnabled: false,
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedRule: true,
},
{
name: "LoadBalancerIPMode disabled, ipMode nil",
ipModeEnabled: false,
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedRule: true,
},
/* LoadBalancerIPMode enabled */
{
name: "LoadBalancerIPMode enabled, ipMode Proxy",
ipModeEnabled: true,
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedRule: false,
},
{
name: "LoadBalancerIPMode enabled, ipMode VIP",
ipModeEnabled: true,
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedRule: true,
},
{
name: "LoadBalancerIPMode enabled, ipMode nil",
ipModeEnabled: true,
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedRule: true,
},
}
svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, testCase.ipModeEnabled)()
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = testCase.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: testCase.svcLBIP,
IPMode: testCase.ipMode,
}}
}),
)
tcpProtocol := v1.ProtocolTCP
populateEndpointSlices(fp,
makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"10.180.0.1"},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.String("p80"),
Port: pointer.Int32(80),
Protocol: &tcpProtocol,
}}
}),
)
fp.syncProxyRules()
c, _ := ipt.Dump.GetChain(utiliptables.TableNAT, kubeServicesChain)
ruleExists := false
for _, r := range c.Rules {
if r.DestinationAddress != nil && r.DestinationAddress.Value == testCase.svcLBIP {
ruleExists = true
}
}
if ruleExists != testCase.expectedRule {
t.Errorf("unexpected rule for %s", testCase.svcLBIP)
}
})
}
}

View File

@ -1172,7 +1172,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Capture load-balancer ingress. // Capture load-balancer ingress.
for _, ingress := range svcInfo.LoadBalancerIPStrings() { for _, ingress := range svcInfo.LoadBalancerVIPStrings() {
// ipset call // ipset call
entry = &utilipset.Entry{ entry = &utilipset.Entry{
IP: ingress, IP: ingress,

View File

@ -36,7 +36,10 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/healthcheck"
utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset" utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset"
@ -5836,3 +5839,123 @@ func TestDismissLocalhostRuleExist(t *testing.T) {
}) })
} }
} }
func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) {
ipModeProxy := v1.LoadBalancerIPModeProxy
ipModeVIP := v1.LoadBalancerIPModeVIP
testCases := []struct {
name string
ipModeEnabled bool
svcIP string
svcLBIP string
ipMode *v1.LoadBalancerIPMode
expectedServices int
}{
/* LoadBalancerIPMode disabled */
{
name: "LoadBalancerIPMode disabled, ipMode Proxy",
ipModeEnabled: false,
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedServices: 2,
},
{
name: "LoadBalancerIPMode disabled, ipMode VIP",
ipModeEnabled: false,
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedServices: 2,
},
{
name: "LoadBalancerIPMode disabled, ipMode nil",
ipModeEnabled: false,
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedServices: 2,
},
/* LoadBalancerIPMode enabled */
{
name: "LoadBalancerIPMode enabled, ipMode Proxy",
ipModeEnabled: true,
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedServices: 1,
},
{
name: "LoadBalancerIPMode enabled, ipMode VIP",
ipModeEnabled: true,
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedServices: 2,
},
{
name: "LoadBalancerIPMode enabled, ipMode nil",
ipModeEnabled: true,
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedServices: 2,
},
}
svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, testCase.ipModeEnabled)()
_, fp := buildFakeProxier()
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = testCase.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: testCase.svcLBIP,
IPMode: testCase.ipMode,
}}
}),
)
tcpProtocol := v1.ProtocolTCP
makeEndpointSliceMap(fp,
makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"10.180.0.1"},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.String("p80"),
Port: pointer.Int32(80),
Protocol: &tcpProtocol,
}}
}),
)
fp.syncProxyRules()
services, err := fp.ipvs.GetVirtualServers()
if err != nil {
t.Errorf("Failed to get ipvs services, err: %v", err)
}
if len(services) != testCase.expectedServices {
t.Errorf("Expected %d ipvs services, got %d", testCase.expectedServices, len(services))
}
})
}
}

View File

@ -44,7 +44,7 @@ type BaseServicePortInfo struct {
port int port int
protocol v1.Protocol protocol v1.Protocol
nodePort int nodePort int
loadBalancerStatus v1.LoadBalancerStatus loadBalancerVIPs []string
sessionAffinityType v1.ServiceAffinity sessionAffinityType v1.ServiceAffinity
stickyMaxAgeSeconds int stickyMaxAgeSeconds int
externalIPs []string externalIPs []string
@ -108,13 +108,9 @@ func (bsvcPortInfo *BaseServicePortInfo) ExternalIPStrings() []string {
return bsvcPortInfo.externalIPs return bsvcPortInfo.externalIPs
} }
// LoadBalancerIPStrings is part of ServicePort interface. // LoadBalancerVIPStrings is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) LoadBalancerIPStrings() []string { func (bsvcPortInfo *BaseServicePortInfo) LoadBalancerVIPStrings() []string {
var ips []string return bsvcPortInfo.loadBalancerVIPs
for _, ing := range bsvcPortInfo.loadBalancerStatus.Ingress {
ips = append(ips, ing.IP)
}
return ips
} }
// ExternalPolicyLocal is part of ServicePort interface. // ExternalPolicyLocal is part of ServicePort interface.
@ -139,7 +135,7 @@ func (bsvcPortInfo *BaseServicePortInfo) HintsAnnotation() string {
// ExternallyAccessible is part of ServicePort interface. // ExternallyAccessible is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) ExternallyAccessible() bool { func (bsvcPortInfo *BaseServicePortInfo) ExternallyAccessible() bool {
return bsvcPortInfo.nodePort != 0 || len(bsvcPortInfo.loadBalancerStatus.Ingress) != 0 || len(bsvcPortInfo.externalIPs) != 0 return bsvcPortInfo.nodePort != 0 || len(bsvcPortInfo.loadBalancerVIPs) != 0 || len(bsvcPortInfo.externalIPs) != 0
} }
// UsesClusterEndpoints is part of ServicePort interface. // UsesClusterEndpoints is part of ServicePort interface.
@ -211,25 +207,21 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
"ipFamily", sct.ipFamily, "loadBalancerSourceRanges", strings.Join(cidrs, ", "), "service", klog.KObj(service)) "ipFamily", sct.ipFamily, "loadBalancerSourceRanges", strings.Join(cidrs, ", "), "service", klog.KObj(service))
} }
// Obtain Load Balancer Ingress IPs // Obtain Load Balancer Ingress
var ips []string var invalidIPs []string
for _, ing := range service.Status.LoadBalancer.Ingress { for _, ing := range service.Status.LoadBalancer.Ingress {
if ing.IP != "" { if ing.IP == "" {
ips = append(ips, ing.IP) continue
}
if ipFamily := proxyutil.GetIPFamilyFromIP(ing.IP); ipFamily == sct.ipFamily && proxyutil.IsVIPMode(ing) {
info.loadBalancerVIPs = append(info.loadBalancerVIPs, ing.IP)
} else {
invalidIPs = append(invalidIPs, ing.IP)
} }
} }
if len(invalidIPs) > 0 {
if len(ips) > 0 {
ipFamilyMap = proxyutil.MapIPsByIPFamily(ips)
if ipList, ok := ipFamilyMap[proxyutil.OtherIPFamily(sct.ipFamily)]; ok && len(ipList) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family", klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family",
"ipFamily", sct.ipFamily, "loadBalancerIngressIps", strings.Join(ipList, ", "), "service", klog.KObj(service)) "ipFamily", sct.ipFamily, "loadBalancerIngressIPs", strings.Join(invalidIPs, ", "), "service", klog.KObj(service))
}
// Create the LoadBalancerStatus with the filtered IPs
for _, ip := range ipFamilyMap[sct.ipFamily] {
info.loadBalancerStatus.Ingress = append(info.loadBalancerStatus.Ingress, v1.LoadBalancerIngress{IP: ip})
}
} }
if apiservice.NeedsHealthCheck(service) { if apiservice.NeedsHealthCheck(service) {

View File

@ -181,10 +181,10 @@ func TestServiceToServiceMap(t *testing.T) {
}), }),
expected: map[ServicePortName]*BaseServicePortInfo{ expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("ns1", "load-balancer", "port3", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}} bsvcPortInfo.loadBalancerVIPs = []string{"10.1.2.4"}
}), }),
makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("ns1", "load-balancer", "port4", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.4"}} bsvcPortInfo.loadBalancerVIPs = []string{"10.1.2.4"}
}), }),
}, },
}, },
@ -204,10 +204,10 @@ func TestServiceToServiceMap(t *testing.T) {
}), }),
expected: map[ServicePortName]*BaseServicePortInfo{ expected: map[ServicePortName]*BaseServicePortInfo{
makeServicePortName("ns1", "only-local-load-balancer", "portx", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8677, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("ns1", "only-local-load-balancer", "portx", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8677, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}} bsvcPortInfo.loadBalancerVIPs = []string{"10.1.2.3"}
}), }),
makeServicePortName("ns1", "only-local-load-balancer", "porty", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8678, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("ns1", "only-local-load-balancer", "porty", v1.ProtocolUDP): makeTestServiceInfo("172.16.55.12", 8678, "UDP", 345, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: "10.1.2.3"}} bsvcPortInfo.loadBalancerVIPs = []string{"10.1.2.3"}
}), }),
}, },
}, },
@ -315,7 +315,7 @@ func TestServiceToServiceMap(t *testing.T) {
makeServicePortName("test", "validIPv4", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("test", "validIPv4", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.externalIPs = []string{testExternalIPv4} bsvcPortInfo.externalIPs = []string{testExternalIPv4}
bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv4} bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv4}
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv4}} bsvcPortInfo.loadBalancerVIPs = []string{testExternalIPv4}
}), }),
}, },
}, },
@ -353,7 +353,7 @@ func TestServiceToServiceMap(t *testing.T) {
makeServicePortName("test", "validIPv6", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("test", "validIPv6", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.externalIPs = []string{testExternalIPv6} bsvcPortInfo.externalIPs = []string{testExternalIPv6}
bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv6} bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv6}
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv6}} bsvcPortInfo.loadBalancerVIPs = []string{testExternalIPv6}
}), }),
}, },
}, },
@ -391,7 +391,7 @@ func TestServiceToServiceMap(t *testing.T) {
makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("test", "filterIPv6InIPV4Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv4, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.externalIPs = []string{testExternalIPv4} bsvcPortInfo.externalIPs = []string{testExternalIPv4}
bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv4} bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv4}
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv4}} bsvcPortInfo.loadBalancerVIPs = []string{testExternalIPv4}
}), }),
}, },
}, },
@ -429,7 +429,7 @@ func TestServiceToServiceMap(t *testing.T) {
makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) { makeServicePortName("test", "filterIPv4InIPV6Mode", "testPort", v1.ProtocolTCP): makeTestServiceInfo(testClusterIPv6, 12345, "TCP", 0, func(bsvcPortInfo *BaseServicePortInfo) {
bsvcPortInfo.externalIPs = []string{testExternalIPv6} bsvcPortInfo.externalIPs = []string{testExternalIPv6}
bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv6} bsvcPortInfo.loadBalancerSourceRanges = []string{testSourceRangeIPv6}
bsvcPortInfo.loadBalancerStatus.Ingress = []v1.LoadBalancerIngress{{IP: testExternalIPv6}} bsvcPortInfo.loadBalancerVIPs = []string{testExternalIPv6}
}), }),
}, },
}, },
@ -483,7 +483,7 @@ func TestServiceToServiceMap(t *testing.T) {
svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort || svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort ||
!sets.New[string](svcInfo.externalIPs...).Equal(sets.New[string](expectedInfo.externalIPs...)) || !sets.New[string](svcInfo.externalIPs...).Equal(sets.New[string](expectedInfo.externalIPs...)) ||
!sets.New[string](svcInfo.loadBalancerSourceRanges...).Equal(sets.New[string](expectedInfo.loadBalancerSourceRanges...)) || !sets.New[string](svcInfo.loadBalancerSourceRanges...).Equal(sets.New[string](expectedInfo.loadBalancerSourceRanges...)) ||
!reflect.DeepEqual(svcInfo.loadBalancerStatus, expectedInfo.loadBalancerStatus) { !reflect.DeepEqual(svcInfo.loadBalancerVIPs, expectedInfo.loadBalancerVIPs) {
t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo) t.Errorf("[%s] expected new[%v]to be %v, got %v", tc.desc, svcKey, expectedInfo, *svcInfo)
} }
for svcKey, expectedInfo := range tc.expected { for svcKey, expectedInfo := range tc.expected {
@ -494,7 +494,7 @@ func TestServiceToServiceMap(t *testing.T) {
svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort || svcInfo.healthCheckNodePort != expectedInfo.healthCheckNodePort ||
!sets.New[string](svcInfo.externalIPs...).Equal(sets.New[string](expectedInfo.externalIPs...)) || !sets.New[string](svcInfo.externalIPs...).Equal(sets.New[string](expectedInfo.externalIPs...)) ||
!sets.New[string](svcInfo.loadBalancerSourceRanges...).Equal(sets.New[string](expectedInfo.loadBalancerSourceRanges...)) || !sets.New[string](svcInfo.loadBalancerSourceRanges...).Equal(sets.New[string](expectedInfo.loadBalancerSourceRanges...)) ||
!reflect.DeepEqual(svcInfo.loadBalancerStatus, expectedInfo.loadBalancerStatus) { !reflect.DeepEqual(svcInfo.loadBalancerVIPs, expectedInfo.loadBalancerVIPs) {
t.Errorf("expected new[%v]to be %v, got %v", svcKey, expectedInfo, *svcInfo) t.Errorf("expected new[%v]to be %v, got %v", svcKey, expectedInfo, *svcInfo)
} }
} }

View File

@ -73,8 +73,8 @@ type ServicePort interface {
StickyMaxAgeSeconds() int StickyMaxAgeSeconds() int
// ExternalIPStrings returns service ExternalIPs as a string array. // ExternalIPStrings returns service ExternalIPs as a string array.
ExternalIPStrings() []string ExternalIPStrings() []string
// LoadBalancerIPStrings returns service LoadBalancerIPs as a string array. // LoadBalancerVIPStrings returns service LoadBalancerIPs which are VIP mode as a string array.
LoadBalancerIPStrings() []string LoadBalancerVIPStrings() []string
// Protocol returns service protocol. // Protocol returns service protocol.
Protocol() v1.Protocol Protocol() v1.Protocol
// LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not // LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not

View File

@ -27,9 +27,11 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilrand "k8s.io/apimachinery/pkg/util/rand" utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
utilsysctl "k8s.io/component-helpers/node/util/sysctl" utilsysctl "k8s.io/component-helpers/node/util/sysctl"
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -182,7 +184,7 @@ func MapIPsByIPFamily(ipStrings []string) map[v1.IPFamily][]string {
ipFamilyMap := map[v1.IPFamily][]string{} ipFamilyMap := map[v1.IPFamily][]string{}
for _, ip := range ipStrings { for _, ip := range ipStrings {
// Handle only the valid IPs // Handle only the valid IPs
if ipFamily := getIPFamilyFromIP(ip); ipFamily != "" { if ipFamily := GetIPFamilyFromIP(ip); ipFamily != v1.IPFamilyUnknown {
ipFamilyMap[ipFamily] = append(ipFamilyMap[ipFamily], ip) ipFamilyMap[ipFamily] = append(ipFamilyMap[ipFamily], ip)
} else { } else {
// this function is called in multiple places. All of which // this function is called in multiple places. All of which
@ -204,7 +206,7 @@ func MapCIDRsByIPFamily(cidrStrings []string) map[v1.IPFamily][]string {
ipFamilyMap := map[v1.IPFamily][]string{} ipFamilyMap := map[v1.IPFamily][]string{}
for _, cidr := range cidrStrings { for _, cidr := range cidrStrings {
// Handle only the valid CIDRs // Handle only the valid CIDRs
if ipFamily := getIPFamilyFromCIDR(cidr); ipFamily != "" { if ipFamily := getIPFamilyFromCIDR(cidr); ipFamily != v1.IPFamilyUnknown {
ipFamilyMap[ipFamily] = append(ipFamilyMap[ipFamily], cidr) ipFamilyMap[ipFamily] = append(ipFamilyMap[ipFamily], cidr)
} else { } else {
klog.ErrorS(nil, "Skipping invalid CIDR", "cidr", cidr) klog.ErrorS(nil, "Skipping invalid CIDR", "cidr", cidr)
@ -213,29 +215,26 @@ func MapCIDRsByIPFamily(cidrStrings []string) map[v1.IPFamily][]string {
return ipFamilyMap return ipFamilyMap
} }
// Returns the IP family of ipStr, or "" if ipStr can't be parsed as an IP // GetIPFamilyFromIP Returns the IP family of ipStr, or IPFamilyUnknown if ipStr can't be parsed as an IP
func getIPFamilyFromIP(ipStr string) v1.IPFamily { func GetIPFamilyFromIP(ipStr string) v1.IPFamily {
netIP := netutils.ParseIPSloppy(ipStr) return convertToV1IPFamily(netutils.IPFamilyOfString(ipStr))
if netIP == nil {
return ""
} }
if netutils.IsIPv6(netIP) { // Returns the IP family of cidrStr, or IPFamilyUnknown if cidrStr can't be parsed as a CIDR
return v1.IPv6Protocol
}
return v1.IPv4Protocol
}
// Returns the IP family of cidrStr, or "" if cidrStr can't be parsed as a CIDR
func getIPFamilyFromCIDR(cidrStr string) v1.IPFamily { func getIPFamilyFromCIDR(cidrStr string) v1.IPFamily {
_, netCIDR, err := netutils.ParseCIDRSloppy(cidrStr) return convertToV1IPFamily(netutils.IPFamilyOfCIDRString(cidrStr))
if err != nil {
return ""
} }
if netutils.IsIPv6CIDR(netCIDR) {
// Convert netutils.IPFamily to v1.IPFamily
func convertToV1IPFamily(ipFamily netutils.IPFamily) v1.IPFamily {
switch ipFamily {
case netutils.IPv4:
return v1.IPv4Protocol
case netutils.IPv6:
return v1.IPv6Protocol return v1.IPv6Protocol
} }
return v1.IPv4Protocol
return v1.IPFamilyUnknown
} }
// OtherIPFamily returns the other ip family // OtherIPFamily returns the other ip family
@ -331,3 +330,13 @@ func RevertPorts(replacementPortsMap, originalPortsMap map[netutils.LocalPort]ne
} }
} }
} }
func IsVIPMode(ing v1.LoadBalancerIngress) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.LoadBalancerIPMode) {
return true // backwards compat
}
if ing.IPMode == nil {
return true
}
return *ing.IPMode == v1.LoadBalancerIPModeVIP
}

View File

@ -4718,6 +4718,8 @@ const (
IPv4Protocol IPFamily = "IPv4" IPv4Protocol IPFamily = "IPv4"
// IPv6Protocol indicates that this IP is IPv6 protocol // IPv6Protocol indicates that this IP is IPv6 protocol
IPv6Protocol IPFamily = "IPv6" IPv6Protocol IPFamily = "IPv6"
// IPFamilyUnknown indicates that this IP is unknown protocol
IPFamilyUnknown IPFamily = ""
) )
// IPFamilyPolicy represents the dual-stack-ness requested or required by a Service // IPFamilyPolicy represents the dual-stack-ness requested or required by a Service