mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-29 21:29:24 +00:00
Merge pull request #82462 from vllry/dualstack-iptables
Dualstack support for kube-proxy iptables mode
This commit is contained in:
@@ -45,7 +45,6 @@ go_library(
|
||||
srcs = [
|
||||
"graceful_termination.go",
|
||||
"ipset.go",
|
||||
"meta_proxier.go",
|
||||
"netlink.go",
|
||||
"netlink_linux.go",
|
||||
"netlink_unsupported.go",
|
||||
@@ -56,8 +55,8 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/config:go_default_library",
|
||||
"//pkg/proxy/healthcheck:go_default_library",
|
||||
"//pkg/proxy/metaproxier:go_default_library",
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
"//pkg/proxy/util:go_default_library",
|
||||
"//pkg/util/async:go_default_library",
|
||||
|
||||
@@ -1,229 +0,0 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ipvs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/config"
|
||||
|
||||
utilnet "k8s.io/utils/net"
|
||||
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
)
|
||||
|
||||
type metaProxier struct {
|
||||
ipv4Proxier proxy.Provider
|
||||
ipv6Proxier proxy.Provider
|
||||
// TODO(imroc): implement node handler for meta proxier.
|
||||
config.NoopNodeHandler
|
||||
}
|
||||
|
||||
// NewMetaProxier returns a dual-stack "meta-proxier". Proxier API
|
||||
// calls will be dispatched to the ProxyProvider instances depending
|
||||
// on address family.
|
||||
func NewMetaProxier(ipv4Proxier, ipv6Proxier proxy.Provider) proxy.Provider {
|
||||
return proxy.Provider(&metaProxier{
|
||||
ipv4Proxier: ipv4Proxier,
|
||||
ipv6Proxier: ipv6Proxier,
|
||||
})
|
||||
}
|
||||
|
||||
// Sync immediately synchronizes the ProxyProvider's current state to
|
||||
// proxy rules.
|
||||
func (proxier *metaProxier) Sync() {
|
||||
proxier.ipv4Proxier.Sync()
|
||||
proxier.ipv6Proxier.Sync()
|
||||
}
|
||||
|
||||
// SyncLoop runs periodic work. This is expected to run as a
|
||||
// goroutine or as the main loop of the app. It does not return.
|
||||
func (proxier *metaProxier) SyncLoop() {
|
||||
go proxier.ipv6Proxier.SyncLoop() // Use go-routine here!
|
||||
proxier.ipv4Proxier.SyncLoop() // never returns
|
||||
}
|
||||
|
||||
// OnServiceAdd is called whenever creation of new service object is observed.
|
||||
func (proxier *metaProxier) OnServiceAdd(service *v1.Service) {
|
||||
if *(service.Spec.IPFamily) == v1.IPv4Protocol {
|
||||
proxier.ipv4Proxier.OnServiceAdd(service)
|
||||
return
|
||||
}
|
||||
proxier.ipv6Proxier.OnServiceAdd(service)
|
||||
}
|
||||
|
||||
// OnServiceUpdate is called whenever modification of an existing
|
||||
// service object is observed.
|
||||
func (proxier *metaProxier) OnServiceUpdate(oldService, service *v1.Service) {
|
||||
// IPFamily is immutable, hence we only need to check on the new service
|
||||
if *(service.Spec.IPFamily) == v1.IPv4Protocol {
|
||||
proxier.ipv4Proxier.OnServiceUpdate(oldService, service)
|
||||
return
|
||||
}
|
||||
|
||||
proxier.ipv6Proxier.OnServiceUpdate(oldService, service)
|
||||
}
|
||||
|
||||
// OnServiceDelete is called whenever deletion of an existing service
|
||||
// object is observed.
|
||||
func (proxier *metaProxier) OnServiceDelete(service *v1.Service) {
|
||||
if *(service.Spec.IPFamily) == v1.IPv4Protocol {
|
||||
proxier.ipv4Proxier.OnServiceDelete(service)
|
||||
return
|
||||
}
|
||||
proxier.ipv6Proxier.OnServiceDelete(service)
|
||||
}
|
||||
|
||||
// OnServiceSynced is called once all the initial event handlers were
|
||||
// called and the state is fully propagated to local cache.
|
||||
func (proxier *metaProxier) OnServiceSynced() {
|
||||
proxier.ipv4Proxier.OnServiceSynced()
|
||||
proxier.ipv6Proxier.OnServiceSynced()
|
||||
}
|
||||
|
||||
// OnEndpointsAdd is called whenever creation of new endpoints object
|
||||
// is observed.
|
||||
func (proxier *metaProxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
||||
ipFamily, err := endpointsIPFamily(endpoints)
|
||||
if err != nil {
|
||||
klog.Warningf("failed to add endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err)
|
||||
return
|
||||
}
|
||||
if *ipFamily == v1.IPv4Protocol {
|
||||
proxier.ipv4Proxier.OnEndpointsAdd(endpoints)
|
||||
return
|
||||
}
|
||||
proxier.ipv6Proxier.OnEndpointsAdd(endpoints)
|
||||
}
|
||||
|
||||
// OnEndpointsUpdate is called whenever modification of an existing
|
||||
// endpoints object is observed.
|
||||
func (proxier *metaProxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
|
||||
ipFamily, err := endpointsIPFamily(endpoints)
|
||||
if err != nil {
|
||||
klog.Warningf("failed to update endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
if *ipFamily == v1.IPv4Protocol {
|
||||
proxier.ipv4Proxier.OnEndpointsUpdate(oldEndpoints, endpoints)
|
||||
return
|
||||
}
|
||||
proxier.ipv6Proxier.OnEndpointsUpdate(oldEndpoints, endpoints)
|
||||
}
|
||||
|
||||
// OnEndpointsDelete is called whenever deletion of an existing
|
||||
// endpoints object is observed.
|
||||
func (proxier *metaProxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
|
||||
ipFamily, err := endpointsIPFamily(endpoints)
|
||||
if err != nil {
|
||||
klog.Warningf("failed to delete endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
if *ipFamily == v1.IPv4Protocol {
|
||||
proxier.ipv4Proxier.OnEndpointsDelete(endpoints)
|
||||
return
|
||||
}
|
||||
proxier.ipv6Proxier.OnEndpointsDelete(endpoints)
|
||||
}
|
||||
|
||||
// OnEndpointsSynced is called once all the initial event handlers
|
||||
// were called and the state is fully propagated to local cache.
|
||||
func (proxier *metaProxier) OnEndpointsSynced() {
|
||||
proxier.ipv4Proxier.OnEndpointsSynced()
|
||||
proxier.ipv6Proxier.OnEndpointsSynced()
|
||||
}
|
||||
|
||||
// TODO: (khenidak) implement EndpointSlice handling
|
||||
|
||||
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
|
||||
// is observed.
|
||||
func (proxier *metaProxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
|
||||
switch endpointSlice.AddressType {
|
||||
case discovery.AddressTypeIPv4:
|
||||
proxier.ipv4Proxier.OnEndpointSliceAdd(endpointSlice)
|
||||
case discovery.AddressTypeIPv6:
|
||||
proxier.ipv6Proxier.OnEndpointSliceAdd(endpointSlice)
|
||||
default:
|
||||
klog.V(4).Infof("EndpointSlice address type not supported by kube-proxy: %s", endpointSlice.AddressType)
|
||||
}
|
||||
}
|
||||
|
||||
// OnEndpointSliceUpdate is called whenever modification of an existing endpoint
|
||||
// slice object is observed.
|
||||
func (proxier *metaProxier) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) {
|
||||
switch newEndpointSlice.AddressType {
|
||||
case discovery.AddressTypeIPv4:
|
||||
proxier.ipv4Proxier.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice)
|
||||
case discovery.AddressTypeIPv6:
|
||||
proxier.ipv6Proxier.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice)
|
||||
default:
|
||||
klog.V(4).Infof("EndpointSlice address type not supported by kube-proxy: %s", newEndpointSlice.AddressType)
|
||||
}
|
||||
}
|
||||
|
||||
// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
|
||||
// object is observed.
|
||||
func (proxier *metaProxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
|
||||
switch endpointSlice.AddressType {
|
||||
case discovery.AddressTypeIPv4:
|
||||
proxier.ipv4Proxier.OnEndpointSliceDelete(endpointSlice)
|
||||
case discovery.AddressTypeIPv6:
|
||||
proxier.ipv6Proxier.OnEndpointSliceDelete(endpointSlice)
|
||||
default:
|
||||
klog.V(4).Infof("EndpointSlice address type not supported by kube-proxy: %s", endpointSlice.AddressType)
|
||||
}
|
||||
}
|
||||
|
||||
// OnEndpointSlicesSynced is called once all the initial event handlers were
|
||||
// called and the state is fully propagated to local cache.
|
||||
func (proxier *metaProxier) OnEndpointSlicesSynced() {
|
||||
proxier.ipv4Proxier.OnEndpointSlicesSynced()
|
||||
proxier.ipv6Proxier.OnEndpointSlicesSynced()
|
||||
}
|
||||
|
||||
// endpointsIPFamily that returns IPFamily of endpoints or error if
|
||||
// failed to identify the IP family.
|
||||
func endpointsIPFamily(endpoints *v1.Endpoints) (*v1.IPFamily, error) {
|
||||
if len(endpoints.Subsets) == 0 {
|
||||
return nil, fmt.Errorf("failed to identify ipfamily for endpoints (no subsets)")
|
||||
}
|
||||
|
||||
// we only need to work with subset [0],endpoint controller
|
||||
// ensures that endpoints selected are of the same family.
|
||||
subset := endpoints.Subsets[0]
|
||||
if len(subset.Addresses) == 0 {
|
||||
return nil, fmt.Errorf("failed to identify ipfamily for endpoints (no addresses)")
|
||||
}
|
||||
// same apply on addresses
|
||||
address := subset.Addresses[0]
|
||||
if len(address.IP) == 0 {
|
||||
return nil, fmt.Errorf("failed to identify ipfamily for endpoints (address has no ip)")
|
||||
}
|
||||
|
||||
ipv4 := v1.IPv4Protocol
|
||||
ipv6 := v1.IPv6Protocol
|
||||
if utilnet.IsIPv6String(address.IP) {
|
||||
return &ipv6, nil
|
||||
}
|
||||
|
||||
return &ipv4, nil
|
||||
}
|
||||
@@ -46,6 +46,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
"k8s.io/kubernetes/pkg/proxy/metaproxier"
|
||||
"k8s.io/kubernetes/pkg/proxy/metrics"
|
||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||
"k8s.io/kubernetes/pkg/util/async"
|
||||
@@ -532,7 +533,7 @@ func NewDualStackProxier(
|
||||
|
||||
// Return a meta-proxier that dispatch calls between the two
|
||||
// single-stack proxier instances
|
||||
return NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
|
||||
return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
|
||||
}
|
||||
|
||||
func filterCIDRs(wantIPv6 bool, cidrs []string) []string {
|
||||
|
||||
Reference in New Issue
Block a user