mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #91357 from aojea/proxyFamily
kube-proxy should infer the service IP family from the ClusterIP field
This commit is contained in:
commit
1a80caef4a
@ -9,6 +9,7 @@ go_library(
|
|||||||
deps = [
|
deps = [
|
||||||
"//pkg/proxy:go_default_library",
|
"//pkg/proxy:go_default_library",
|
||||||
"//pkg/proxy/config:go_default_library",
|
"//pkg/proxy/config:go_default_library",
|
||||||
|
"//pkg/proxy/util:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
|
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
|
||||||
"//vendor/k8s.io/klog/v2:go_default_library",
|
"//vendor/k8s.io/klog/v2:go_default_library",
|
||||||
|
@ -19,10 +19,11 @@ package metaproxier
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kubernetes/pkg/proxy"
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
"k8s.io/kubernetes/pkg/proxy/config"
|
"k8s.io/kubernetes/pkg/proxy/config"
|
||||||
|
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||||
|
|
||||||
utilnet "k8s.io/utils/net"
|
utilnet "k8s.io/utils/net"
|
||||||
|
|
||||||
@ -62,33 +63,41 @@ func (proxier *metaProxier) SyncLoop() {
|
|||||||
|
|
||||||
// OnServiceAdd is called whenever creation of new service object is observed.
|
// OnServiceAdd is called whenever creation of new service object is observed.
|
||||||
func (proxier *metaProxier) OnServiceAdd(service *v1.Service) {
|
func (proxier *metaProxier) OnServiceAdd(service *v1.Service) {
|
||||||
if *(service.Spec.IPFamily) == v1.IPv4Protocol {
|
if utilproxy.ShouldSkipService(service) {
|
||||||
proxier.ipv4Proxier.OnServiceAdd(service)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if utilnet.IsIPv6String(service.Spec.ClusterIP) {
|
||||||
proxier.ipv6Proxier.OnServiceAdd(service)
|
proxier.ipv6Proxier.OnServiceAdd(service)
|
||||||
|
} else {
|
||||||
|
proxier.ipv4Proxier.OnServiceAdd(service)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnServiceUpdate is called whenever modification of an existing
|
// OnServiceUpdate is called whenever modification of an existing
|
||||||
// service object is observed.
|
// service object is observed.
|
||||||
func (proxier *metaProxier) OnServiceUpdate(oldService, service *v1.Service) {
|
func (proxier *metaProxier) OnServiceUpdate(oldService, service *v1.Service) {
|
||||||
// IPFamily is immutable, hence we only need to check on the new service
|
if utilproxy.ShouldSkipService(service) {
|
||||||
if *(service.Spec.IPFamily) == v1.IPv4Protocol {
|
|
||||||
proxier.ipv4Proxier.OnServiceUpdate(oldService, service)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// IPFamily is immutable, hence we only need to check on the new service
|
||||||
|
if utilnet.IsIPv6String(service.Spec.ClusterIP) {
|
||||||
proxier.ipv6Proxier.OnServiceUpdate(oldService, service)
|
proxier.ipv6Proxier.OnServiceUpdate(oldService, service)
|
||||||
|
} else {
|
||||||
|
proxier.ipv4Proxier.OnServiceUpdate(oldService, service)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnServiceDelete is called whenever deletion of an existing service
|
// OnServiceDelete is called whenever deletion of an existing service
|
||||||
// object is observed.
|
// object is observed.
|
||||||
func (proxier *metaProxier) OnServiceDelete(service *v1.Service) {
|
func (proxier *metaProxier) OnServiceDelete(service *v1.Service) {
|
||||||
if *(service.Spec.IPFamily) == v1.IPv4Protocol {
|
if utilproxy.ShouldSkipService(service) {
|
||||||
proxier.ipv4Proxier.OnServiceDelete(service)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if utilnet.IsIPv6String(service.Spec.ClusterIP) {
|
||||||
proxier.ipv6Proxier.OnServiceDelete(service)
|
proxier.ipv6Proxier.OnServiceDelete(service)
|
||||||
|
} else {
|
||||||
|
proxier.ipv4Proxier.OnServiceDelete(service)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnServiceSynced is called once all the initial event handlers were
|
// OnServiceSynced is called once all the initial event handlers were
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
@ -307,8 +307,8 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic
|
|||||||
if service == nil {
|
if service == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
|
||||||
if utilproxy.ShouldSkipService(svcName, service) {
|
if utilproxy.ShouldSkipService(service) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -322,6 +322,7 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic
|
|||||||
}
|
}
|
||||||
|
|
||||||
serviceMap := make(ServiceMap)
|
serviceMap := make(ServiceMap)
|
||||||
|
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
for i := range service.Spec.Ports {
|
for i := range service.Spec.Ports {
|
||||||
servicePort := &service.Spec.Ports[i]
|
servicePort := &service.Spec.Ports[i]
|
||||||
svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol}
|
svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol}
|
||||||
|
@ -489,12 +489,11 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String {
|
|||||||
if service == nil {
|
if service == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
if utilproxy.ShouldSkipService(service) {
|
||||||
if utilproxy.ShouldSkipService(svcName, service) {
|
|
||||||
klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
existingPorts := sets.NewString()
|
existingPorts := sets.NewString()
|
||||||
|
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
for i := range service.Spec.Ports {
|
for i := range service.Spec.Ports {
|
||||||
servicePort := &service.Spec.Ports[i]
|
servicePort := &service.Spec.Ports[i]
|
||||||
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
|
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
|
||||||
@ -551,12 +550,12 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S
|
|||||||
if service == nil {
|
if service == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
|
||||||
if utilproxy.ShouldSkipService(svcName, service) {
|
if utilproxy.ShouldSkipService(service) {
|
||||||
klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
staleUDPServices := sets.NewString()
|
staleUDPServices := sets.NewString()
|
||||||
|
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
for i := range service.Spec.Ports {
|
for i := range service.Spec.Ports {
|
||||||
servicePort := &service.Spec.Ports[i]
|
servicePort := &service.Spec.Ports[i]
|
||||||
if existingPorts.Has(servicePort.Name) {
|
if existingPorts.Has(servicePort.Name) {
|
||||||
|
@ -35,7 +35,6 @@ go_test(
|
|||||||
"//pkg/proxy/util/testing:go_default_library",
|
"//pkg/proxy/util/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -146,15 +146,15 @@ func GetLocalAddrs() ([]net.IP, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ShouldSkipService checks if a given service should skip proxying
|
// ShouldSkipService checks if a given service should skip proxying
|
||||||
func ShouldSkipService(svcName types.NamespacedName, service *v1.Service) bool {
|
func ShouldSkipService(service *v1.Service) bool {
|
||||||
// if ClusterIP is "None" or empty, skip proxying
|
// if ClusterIP is "None" or empty, skip proxying
|
||||||
if !helper.IsServiceIPSet(service) {
|
if !helper.IsServiceIPSet(service) {
|
||||||
klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
|
klog.V(3).Infof("Skipping service %s in namespace %s due to clusterIP = %q", service.Name, service.Namespace, service.Spec.ClusterIP)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
// Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
|
// Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
|
||||||
if service.Spec.Type == v1.ServiceTypeExternalName {
|
if service.Spec.Type == v1.ServiceTypeExternalName {
|
||||||
klog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName)
|
klog.V(3).Infof("Skipping service %s in namespace %s due to Type=ExternalName", service.Name, service.Namespace)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
@ -25,7 +25,6 @@ import (
|
|||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
fake "k8s.io/kubernetes/pkg/proxy/util/testing"
|
fake "k8s.io/kubernetes/pkg/proxy/util/testing"
|
||||||
)
|
)
|
||||||
@ -167,7 +166,6 @@ func TestIsProxyableHostname(t *testing.T) {
|
|||||||
func TestShouldSkipService(t *testing.T) {
|
func TestShouldSkipService(t *testing.T) {
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
service *v1.Service
|
service *v1.Service
|
||||||
svcName types.NamespacedName
|
|
||||||
shouldSkip bool
|
shouldSkip bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -178,7 +176,6 @@ func TestShouldSkipService(t *testing.T) {
|
|||||||
ClusterIP: v1.ClusterIPNone,
|
ClusterIP: v1.ClusterIPNone,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
svcName: types.NamespacedName{Namespace: "foo", Name: "bar"},
|
|
||||||
shouldSkip: true,
|
shouldSkip: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -189,7 +186,6 @@ func TestShouldSkipService(t *testing.T) {
|
|||||||
ClusterIP: "",
|
ClusterIP: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
svcName: types.NamespacedName{Namespace: "foo", Name: "bar"},
|
|
||||||
shouldSkip: true,
|
shouldSkip: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -201,7 +197,6 @@ func TestShouldSkipService(t *testing.T) {
|
|||||||
Type: v1.ServiceTypeExternalName,
|
Type: v1.ServiceTypeExternalName,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
svcName: types.NamespacedName{Namespace: "foo", Name: "bar"},
|
|
||||||
shouldSkip: true,
|
shouldSkip: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -213,7 +208,6 @@ func TestShouldSkipService(t *testing.T) {
|
|||||||
Type: v1.ServiceTypeClusterIP,
|
Type: v1.ServiceTypeClusterIP,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
svcName: types.NamespacedName{Namespace: "foo", Name: "bar"},
|
|
||||||
shouldSkip: false,
|
shouldSkip: false,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -225,7 +219,6 @@ func TestShouldSkipService(t *testing.T) {
|
|||||||
Type: v1.ServiceTypeNodePort,
|
Type: v1.ServiceTypeNodePort,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
svcName: types.NamespacedName{Namespace: "foo", Name: "bar"},
|
|
||||||
shouldSkip: false,
|
shouldSkip: false,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -237,13 +230,12 @@ func TestShouldSkipService(t *testing.T) {
|
|||||||
Type: v1.ServiceTypeLoadBalancer,
|
Type: v1.ServiceTypeLoadBalancer,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
svcName: types.NamespacedName{Namespace: "foo", Name: "bar"},
|
|
||||||
shouldSkip: false,
|
shouldSkip: false,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range testCases {
|
for i := range testCases {
|
||||||
skip := ShouldSkipService(testCases[i].svcName, testCases[i].service)
|
skip := ShouldSkipService(testCases[i].service)
|
||||||
if skip != testCases[i].shouldSkip {
|
if skip != testCases[i].shouldSkip {
|
||||||
t.Errorf("case %d: expect %v, got %v", i, testCases[i].shouldSkip, skip)
|
t.Errorf("case %d: expect %v, got %v", i, testCases[i].shouldSkip, skip)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user