Merge pull request #123005 from danwinship/minor-proxy-cleanup

Minor proxy cleanup
This commit is contained in:
Kubernetes Prow Robot 2024-01-28 08:44:38 -08:00 committed by GitHub
commit 27ad20db35
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 348 additions and 365 deletions

View File

@ -22,10 +22,9 @@ import (
"strconv" "strconv"
"strings" "strings"
"k8s.io/component-helpers/node/util/sysctl"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/mount-utils" "k8s.io/mount-utils"
"k8s.io/component-helpers/node/util/sysctl"
) )
// Conntracker is an interface to the global sysctl. Descriptions of the various // Conntracker is an interface to the global sysctl. Descriptions of the various

View File

@ -20,9 +20,9 @@ limitations under the License.
package app package app
import ( import (
"k8s.io/kubernetes/pkg/windows/service"
"github.com/spf13/pflag" "github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/windows/service"
) )
const ( const (

View File

@ -28,9 +28,6 @@ import (
"strings" "strings"
"time" "time"
"k8s.io/kubernetes/pkg/features"
utilnode "k8s.io/kubernetes/pkg/util/node"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/pflag" "github.com/spf13/pflag"
@ -73,6 +70,7 @@ import (
"k8s.io/kube-proxy/config/v1alpha1" "k8s.io/kube-proxy/config/v1alpha1"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/cluster/ports" "k8s.io/kubernetes/pkg/cluster/ports"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/apis" "k8s.io/kubernetes/pkg/proxy/apis"
@ -85,6 +83,7 @@ import (
proxyutil "k8s.io/kubernetes/pkg/proxy/util" proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/filesystem" "k8s.io/kubernetes/pkg/util/filesystem"
utilflag "k8s.io/kubernetes/pkg/util/flag" utilflag "k8s.io/kubernetes/pkg/util/flag"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/oom"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"

View File

@ -31,19 +31,18 @@ import (
"github.com/google/cadvisor/machine" "github.com/google/cadvisor/machine"
"github.com/google/cadvisor/utils/sysfs" "github.com/google/cadvisor/utils/sysfs"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/apimachinery/pkg/fields"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
toolswatch "k8s.io/client-go/tools/watch" toolswatch "k8s.io/client-go/tools/watch"
utilsysctl "k8s.io/component-helpers/node/util/sysctl" utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
@ -57,8 +56,6 @@ import (
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/utils/exec" "k8s.io/utils/exec"
"k8s.io/klog/v2"
) )
// timeoutForNodePodCIDR is the time to wait for allocators to assign a PodCIDR to the // timeoutForNodePodCIDR is the time to wait for allocators to assign a PodCIDR to the

View File

@ -31,6 +31,7 @@ import (
"time" "time"
"github.com/spf13/pflag" "github.com/spf13/pflag"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"

View File

@ -25,7 +25,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"strconv"
// Enable pprof HTTP handlers. // Enable pprof HTTP handlers.
_ "net/http/pprof" _ "net/http/pprof"
@ -83,11 +82,6 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
if initOnly { if initOnly {
return nil, fmt.Errorf("--init-only is not implemented on Windows") return nil, fmt.Errorf("--init-only is not implemented on Windows")
} }
var healthzPort int
if len(config.HealthzBindAddress) > 0 {
_, port, _ := net.SplitHostPort(config.HealthzBindAddress)
healthzPort, _ = strconv.Atoi(port)
}
var proxier proxy.Provider var proxier proxy.Provider
var err error var err error
@ -96,26 +90,24 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
proxier, err = winkernel.NewDualStackProxier( proxier, err = winkernel.NewDualStackProxier(
config.IPTables.SyncPeriod.Duration, config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration,
config.ClusterCIDR,
s.Hostname, s.Hostname,
s.NodeIPs, s.NodeIPs,
s.Recorder, s.Recorder,
s.HealthzServer, s.HealthzServer,
config.HealthzBindAddress,
config.Winkernel, config.Winkernel,
healthzPort,
) )
} else { } else {
proxier, err = winkernel.NewProxier( proxier, err = winkernel.NewProxier(
s.PrimaryIPFamily, s.PrimaryIPFamily,
config.IPTables.SyncPeriod.Duration, config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration,
config.ClusterCIDR,
s.Hostname, s.Hostname,
s.NodeIPs[s.PrimaryIPFamily], s.NodeIPs[s.PrimaryIPFamily],
s.Recorder, s.Recorder,
s.HealthzServer, s.HealthzServer,
config.HealthzBindAddress,
config.Winkernel, config.Winkernel,
healthzPort,
) )
} }
if err != nil { if err != nil {

View File

@ -95,47 +95,42 @@ const (
const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet" const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet"
const sysctlNFConntrackTCPBeLiberal = "net/netfilter/nf_conntrack_tcp_be_liberal" const sysctlNFConntrackTCPBeLiberal = "net/netfilter/nf_conntrack_tcp_be_liberal"
// internal struct for string service information // NewDualStackProxier creates a MetaProxier instance, with IPv4 and IPv6 proxies.
type servicePortInfo struct { func NewDualStackProxier(
*proxy.BaseServicePortInfo ipt [2]utiliptables.Interface,
// The following fields are computed and stored for performance reasons. sysctl utilsysctl.Interface,
nameString string exec utilexec.Interface,
clusterPolicyChainName utiliptables.Chain syncPeriod time.Duration,
localPolicyChainName utiliptables.Chain minSyncPeriod time.Duration,
firewallChainName utiliptables.Chain masqueradeAll bool,
externalChainName utiliptables.Chain localhostNodePorts bool,
} masqueradeBit int,
localDetectors [2]proxyutiliptables.LocalTrafficDetector,
// returns a new proxy.ServicePort which abstracts a serviceInfo hostname string,
func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort { nodeIPs map[v1.IPFamily]net.IP,
svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo} recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer,
// Store the following for performance reasons. nodePortAddresses []string,
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} initOnly bool,
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} ) (proxy.Provider, error) {
protocol := strings.ToLower(string(svcPort.Protocol())) // Create an ipv4 instance of the single-stack proxier
svcPort.nameString = svcPortName.String() ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], sysctl,
svcPort.clusterPolicyChainName = servicePortPolicyClusterChain(svcPort.nameString, protocol) exec, syncPeriod, minSyncPeriod, masqueradeAll, localhostNodePorts, masqueradeBit, localDetectors[0], hostname,
svcPort.localPolicyChainName = servicePortPolicyLocalChainName(svcPort.nameString, protocol) nodeIPs[v1.IPv4Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
svcPort.firewallChainName = serviceFirewallChainName(svcPort.nameString, protocol) if err != nil {
svcPort.externalChainName = serviceExternalChainName(svcPort.nameString, protocol) return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
return svcPort
}
// internal struct for endpoints information
type endpointInfo struct {
*proxy.BaseEndpointInfo
ChainName utiliptables.Chain
}
// returns a new proxy.Endpoint which abstracts a endpointInfo
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
return &endpointInfo{
BaseEndpointInfo: baseInfo,
ChainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()),
} }
ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], sysctl,
exec, syncPeriod, minSyncPeriod, masqueradeAll, false, masqueradeBit, localDetectors[1], hostname,
nodeIPs[v1.IPv6Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
}
if initOnly {
return nil, nil
}
return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
} }
// Proxier is an iptables based proxy for connections between a localhost:lport // Proxier is an iptables based proxy for connections between a localhost:lport
@ -322,42 +317,47 @@ func NewProxier(ipFamily v1.IPFamily,
return proxier, nil return proxier, nil
} }
// NewDualStackProxier creates a MetaProxier instance, with IPv4 and IPv6 proxies. // internal struct for string service information
func NewDualStackProxier( type servicePortInfo struct {
ipt [2]utiliptables.Interface, *proxy.BaseServicePortInfo
sysctl utilsysctl.Interface, // The following fields are computed and stored for performance reasons.
exec utilexec.Interface, nameString string
syncPeriod time.Duration, clusterPolicyChainName utiliptables.Chain
minSyncPeriod time.Duration, localPolicyChainName utiliptables.Chain
masqueradeAll bool, firewallChainName utiliptables.Chain
localhostNodePorts bool, externalChainName utiliptables.Chain
masqueradeBit int, }
localDetectors [2]proxyutiliptables.LocalTrafficDetector,
hostname string,
nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer,
nodePortAddresses []string,
initOnly bool,
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], sysctl,
exec, syncPeriod, minSyncPeriod, masqueradeAll, localhostNodePorts, masqueradeBit, localDetectors[0], hostname,
nodeIPs[v1.IPv4Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
}
ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], sysctl, // returns a new proxy.ServicePort which abstracts a serviceInfo
exec, syncPeriod, minSyncPeriod, masqueradeAll, false, masqueradeBit, localDetectors[1], hostname, func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
nodeIPs[v1.IPv6Protocol], recorder, healthzServer, nodePortAddresses, initOnly) svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo}
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err) // Store the following for performance reasons.
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
protocol := strings.ToLower(string(svcPort.Protocol()))
svcPort.nameString = svcPortName.String()
svcPort.clusterPolicyChainName = servicePortPolicyClusterChain(svcPort.nameString, protocol)
svcPort.localPolicyChainName = servicePortPolicyLocalChainName(svcPort.nameString, protocol)
svcPort.firewallChainName = serviceFirewallChainName(svcPort.nameString, protocol)
svcPort.externalChainName = serviceExternalChainName(svcPort.nameString, protocol)
return svcPort
}
// internal struct for endpoints information
type endpointInfo struct {
*proxy.BaseEndpointInfo
ChainName utiliptables.Chain
}
// returns a new proxy.Endpoint which abstracts a endpointInfo
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
return &endpointInfo{
BaseEndpointInfo: baseInfo,
ChainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()),
} }
if initOnly {
return nil, nil
}
return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
} }
type iptablesJumpChain struct { type iptablesJumpChain struct {

View File

@ -98,112 +98,6 @@ const (
defaultDummyDevice = "kube-ipvs0" defaultDummyDevice = "kube-ipvs0"
) )
// iptablesJumpChain is tables of iptables chains that ipvs proxier used to install iptables or cleanup iptables.
// `to` is the iptables chain we want to operate.
// `from` is the source iptables chain
var iptablesJumpChain = []struct {
table utiliptables.Table
from utiliptables.Chain
to utiliptables.Chain
comment string
}{
{utiliptables.TableNAT, utiliptables.ChainOutput, kubeServicesChain, "kubernetes service portals"},
{utiliptables.TableNAT, utiliptables.ChainPrerouting, kubeServicesChain, "kubernetes service portals"},
{utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, "kubernetes postrouting rules"},
{utiliptables.TableFilter, utiliptables.ChainForward, kubeForwardChain, "kubernetes forwarding rules"},
{utiliptables.TableFilter, utiliptables.ChainInput, kubeNodePortChain, "kubernetes health check rules"},
{utiliptables.TableFilter, utiliptables.ChainInput, kubeProxyFirewallChain, "kube-proxy firewall rules"},
{utiliptables.TableFilter, utiliptables.ChainForward, kubeProxyFirewallChain, "kube-proxy firewall rules"},
{utiliptables.TableFilter, utiliptables.ChainInput, kubeIPVSFilterChain, "kubernetes ipvs access filter"},
{utiliptables.TableFilter, utiliptables.ChainOutput, kubeIPVSOutFilterChain, "kubernetes ipvs access filter"},
}
var iptablesChains = []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableNAT, kubeServicesChain},
{utiliptables.TableNAT, kubePostroutingChain},
{utiliptables.TableNAT, kubeNodePortChain},
{utiliptables.TableNAT, kubeLoadBalancerChain},
{utiliptables.TableNAT, kubeMarkMasqChain},
{utiliptables.TableFilter, kubeForwardChain},
{utiliptables.TableFilter, kubeNodePortChain},
{utiliptables.TableFilter, kubeProxyFirewallChain},
{utiliptables.TableFilter, kubeSourceRangesFirewallChain},
{utiliptables.TableFilter, kubeIPVSFilterChain},
{utiliptables.TableFilter, kubeIPVSOutFilterChain},
}
var iptablesCleanupChains = []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableNAT, kubeServicesChain},
{utiliptables.TableNAT, kubePostroutingChain},
{utiliptables.TableNAT, kubeNodePortChain},
{utiliptables.TableNAT, kubeLoadBalancerChain},
{utiliptables.TableFilter, kubeForwardChain},
{utiliptables.TableFilter, kubeNodePortChain},
{utiliptables.TableFilter, kubeProxyFirewallChain},
{utiliptables.TableFilter, kubeSourceRangesFirewallChain},
{utiliptables.TableFilter, kubeIPVSFilterChain},
{utiliptables.TableFilter, kubeIPVSOutFilterChain},
}
// ipsetInfo is all ipset we needed in ipvs proxier
var ipsetInfo = []struct {
name string
setType utilipset.Type
comment string
}{
{kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment},
{kubeClusterIPSet, utilipset.HashIPPort, kubeClusterIPSetComment},
{kubeExternalIPSet, utilipset.HashIPPort, kubeExternalIPSetComment},
{kubeExternalIPLocalSet, utilipset.HashIPPort, kubeExternalIPLocalSetComment},
{kubeLoadBalancerSet, utilipset.HashIPPort, kubeLoadBalancerSetComment},
{kubeLoadBalancerFWSet, utilipset.HashIPPort, kubeLoadBalancerFWSetComment},
{kubeLoadBalancerLocalSet, utilipset.HashIPPort, kubeLoadBalancerLocalSetComment},
{kubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, kubeLoadBalancerSourceIPSetComment},
{kubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment},
{kubeNodePortSetTCP, utilipset.BitmapPort, kubeNodePortSetTCPComment},
{kubeNodePortLocalSetTCP, utilipset.BitmapPort, kubeNodePortLocalSetTCPComment},
{kubeNodePortSetUDP, utilipset.BitmapPort, kubeNodePortSetUDPComment},
{kubeNodePortLocalSetUDP, utilipset.BitmapPort, kubeNodePortLocalSetUDPComment},
{kubeNodePortSetSCTP, utilipset.HashIPPort, kubeNodePortSetSCTPComment},
{kubeNodePortLocalSetSCTP, utilipset.HashIPPort, kubeNodePortLocalSetSCTPComment},
{kubeHealthCheckNodePortSet, utilipset.BitmapPort, kubeHealthCheckNodePortSetComment},
{kubeIPVSSet, utilipset.HashIP, kubeIPVSSetComment},
}
// ipsetWithIptablesChain is the ipsets list with iptables source chain and the chain jump to
// `iptables -t nat -A <from> -m set --match-set <name> <matchType> -j <to>`
// example: iptables -t nat -A KUBE-SERVICES -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-NODE-PORT
// ipsets with other match rules will be created Individually.
// Note: kubeNodePortLocalSetTCP must be prior to kubeNodePortSetTCP, the same for UDP.
var ipsetWithIptablesChain = []struct {
name string
table utiliptables.Table
from string
to string
matchType string
protocolMatch string
}{
{kubeLoopBackIPSet, utiliptables.TableNAT, string(kubePostroutingChain), "MASQUERADE", "dst,dst,src", ""},
{kubeLoadBalancerSet, utiliptables.TableNAT, string(kubeServicesChain), string(kubeLoadBalancerChain), "dst,dst", ""},
{kubeLoadBalancerLocalSet, utiliptables.TableNAT, string(kubeLoadBalancerChain), "RETURN", "dst,dst", ""},
{kubeNodePortLocalSetTCP, utiliptables.TableNAT, string(kubeNodePortChain), "RETURN", "dst", utilipset.ProtocolTCP},
{kubeNodePortSetTCP, utiliptables.TableNAT, string(kubeNodePortChain), string(kubeMarkMasqChain), "dst", utilipset.ProtocolTCP},
{kubeNodePortLocalSetUDP, utiliptables.TableNAT, string(kubeNodePortChain), "RETURN", "dst", utilipset.ProtocolUDP},
{kubeNodePortSetUDP, utiliptables.TableNAT, string(kubeNodePortChain), string(kubeMarkMasqChain), "dst", utilipset.ProtocolUDP},
{kubeNodePortLocalSetSCTP, utiliptables.TableNAT, string(kubeNodePortChain), "RETURN", "dst,dst", utilipset.ProtocolSCTP},
{kubeNodePortSetSCTP, utiliptables.TableNAT, string(kubeNodePortChain), string(kubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP},
{kubeLoadBalancerFWSet, utiliptables.TableFilter, string(kubeProxyFirewallChain), string(kubeSourceRangesFirewallChain), "dst,dst", ""},
{kubeLoadBalancerSourceCIDRSet, utiliptables.TableFilter, string(kubeSourceRangesFirewallChain), "RETURN", "dst,dst,src", ""},
{kubeLoadBalancerSourceIPSet, utiliptables.TableFilter, string(kubeSourceRangesFirewallChain), "RETURN", "dst,dst,src", ""},
}
// In IPVS proxy mode, the following flags need to be set // In IPVS proxy mode, the following flags need to be set
const ( const (
sysctlVSConnTrack = "net/ipv4/vs/conntrack" sysctlVSConnTrack = "net/ipv4/vs/conntrack"
@ -215,6 +109,58 @@ const (
sysctlArpAnnounce = "net/ipv4/conf/all/arp_announce" sysctlArpAnnounce = "net/ipv4/conf/all/arp_announce"
) )
// NewDualStackProxier returns a new Proxier for dual-stack operation
func NewDualStackProxier(
ipt [2]utiliptables.Interface,
ipvs utilipvs.Interface,
ipset utilipset.Interface,
sysctl utilsysctl.Interface,
exec utilexec.Interface,
syncPeriod time.Duration,
minSyncPeriod time.Duration,
excludeCIDRs []string,
strictARP bool,
tcpTimeout time.Duration,
tcpFinTimeout time.Duration,
udpTimeout time.Duration,
masqueradeAll bool,
masqueradeBit int,
localDetectors [2]proxyutiliptables.LocalTrafficDetector,
hostname string,
nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer,
scheduler string,
nodePortAddresses []string,
initOnly bool,
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], ipvs, ipset, sysctl,
exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP,
tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
localDetectors[0], hostname, nodeIPs[v1.IPv4Protocol], recorder,
healthzServer, scheduler, nodePortAddresses, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
}
ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], ipvs, ipset, sysctl,
exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP,
tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
localDetectors[1], hostname, nodeIPs[v1.IPv6Protocol], recorder,
healthzServer, scheduler, nodePortAddresses, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
}
if initOnly {
return nil, nil
}
// Return a meta-proxier that dispatch calls between the two
// single-stack proxier instances
return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
}
// Proxier is an ipvs based proxy for connections between a localhost:lport // Proxier is an ipvs based proxy for connections between a localhost:lport
// and services that provide the actual backends. // and services that provide the actual backends.
type Proxier struct { type Proxier struct {
@ -466,59 +412,6 @@ func NewProxier(ipFamily v1.IPFamily,
return proxier, nil return proxier, nil
} }
// NewDualStackProxier returns a new Proxier for dual-stack operation
func NewDualStackProxier(
ipt [2]utiliptables.Interface,
ipvs utilipvs.Interface,
ipset utilipset.Interface,
sysctl utilsysctl.Interface,
exec utilexec.Interface,
syncPeriod time.Duration,
minSyncPeriod time.Duration,
excludeCIDRs []string,
strictARP bool,
tcpTimeout time.Duration,
tcpFinTimeout time.Duration,
udpTimeout time.Duration,
masqueradeAll bool,
masqueradeBit int,
localDetectors [2]proxyutiliptables.LocalTrafficDetector,
hostname string,
nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer,
scheduler string,
nodePortAddresses []string,
initOnly bool,
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], ipvs, ipset, sysctl,
exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP,
tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
localDetectors[0], hostname, nodeIPs[v1.IPv4Protocol], recorder,
healthzServer, scheduler, nodePortAddresses, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
}
ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], ipvs, ipset, sysctl,
exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP,
tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
localDetectors[1], hostname, nodeIPs[v1.IPv6Protocol], recorder,
healthzServer, scheduler, nodePortAddresses, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
}
if initOnly {
return nil, nil
}
// Return a meta-proxier that dispatch calls between the two
// single-stack proxier instances
return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
}
func filterCIDRs(wantIPv6 bool, cidrs []string) []string { func filterCIDRs(wantIPv6 bool, cidrs []string) []string {
var filteredCIDRs []string var filteredCIDRs []string
for _, cidr := range cidrs { for _, cidr := range cidrs {
@ -529,6 +422,112 @@ func filterCIDRs(wantIPv6 bool, cidrs []string) []string {
return filteredCIDRs return filteredCIDRs
} }
// iptablesJumpChain is tables of iptables chains that ipvs proxier used to install iptables or cleanup iptables.
// `to` is the iptables chain we want to operate.
// `from` is the source iptables chain
var iptablesJumpChain = []struct {
table utiliptables.Table
from utiliptables.Chain
to utiliptables.Chain
comment string
}{
{utiliptables.TableNAT, utiliptables.ChainOutput, kubeServicesChain, "kubernetes service portals"},
{utiliptables.TableNAT, utiliptables.ChainPrerouting, kubeServicesChain, "kubernetes service portals"},
{utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, "kubernetes postrouting rules"},
{utiliptables.TableFilter, utiliptables.ChainForward, kubeForwardChain, "kubernetes forwarding rules"},
{utiliptables.TableFilter, utiliptables.ChainInput, kubeNodePortChain, "kubernetes health check rules"},
{utiliptables.TableFilter, utiliptables.ChainInput, kubeProxyFirewallChain, "kube-proxy firewall rules"},
{utiliptables.TableFilter, utiliptables.ChainForward, kubeProxyFirewallChain, "kube-proxy firewall rules"},
{utiliptables.TableFilter, utiliptables.ChainInput, kubeIPVSFilterChain, "kubernetes ipvs access filter"},
{utiliptables.TableFilter, utiliptables.ChainOutput, kubeIPVSOutFilterChain, "kubernetes ipvs access filter"},
}
var iptablesChains = []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableNAT, kubeServicesChain},
{utiliptables.TableNAT, kubePostroutingChain},
{utiliptables.TableNAT, kubeNodePortChain},
{utiliptables.TableNAT, kubeLoadBalancerChain},
{utiliptables.TableNAT, kubeMarkMasqChain},
{utiliptables.TableFilter, kubeForwardChain},
{utiliptables.TableFilter, kubeNodePortChain},
{utiliptables.TableFilter, kubeProxyFirewallChain},
{utiliptables.TableFilter, kubeSourceRangesFirewallChain},
{utiliptables.TableFilter, kubeIPVSFilterChain},
{utiliptables.TableFilter, kubeIPVSOutFilterChain},
}
var iptablesCleanupChains = []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableNAT, kubeServicesChain},
{utiliptables.TableNAT, kubePostroutingChain},
{utiliptables.TableNAT, kubeNodePortChain},
{utiliptables.TableNAT, kubeLoadBalancerChain},
{utiliptables.TableFilter, kubeForwardChain},
{utiliptables.TableFilter, kubeNodePortChain},
{utiliptables.TableFilter, kubeProxyFirewallChain},
{utiliptables.TableFilter, kubeSourceRangesFirewallChain},
{utiliptables.TableFilter, kubeIPVSFilterChain},
{utiliptables.TableFilter, kubeIPVSOutFilterChain},
}
// ipsetInfo is all ipset we needed in ipvs proxier
var ipsetInfo = []struct {
name string
setType utilipset.Type
comment string
}{
{kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment},
{kubeClusterIPSet, utilipset.HashIPPort, kubeClusterIPSetComment},
{kubeExternalIPSet, utilipset.HashIPPort, kubeExternalIPSetComment},
{kubeExternalIPLocalSet, utilipset.HashIPPort, kubeExternalIPLocalSetComment},
{kubeLoadBalancerSet, utilipset.HashIPPort, kubeLoadBalancerSetComment},
{kubeLoadBalancerFWSet, utilipset.HashIPPort, kubeLoadBalancerFWSetComment},
{kubeLoadBalancerLocalSet, utilipset.HashIPPort, kubeLoadBalancerLocalSetComment},
{kubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, kubeLoadBalancerSourceIPSetComment},
{kubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment},
{kubeNodePortSetTCP, utilipset.BitmapPort, kubeNodePortSetTCPComment},
{kubeNodePortLocalSetTCP, utilipset.BitmapPort, kubeNodePortLocalSetTCPComment},
{kubeNodePortSetUDP, utilipset.BitmapPort, kubeNodePortSetUDPComment},
{kubeNodePortLocalSetUDP, utilipset.BitmapPort, kubeNodePortLocalSetUDPComment},
{kubeNodePortSetSCTP, utilipset.HashIPPort, kubeNodePortSetSCTPComment},
{kubeNodePortLocalSetSCTP, utilipset.HashIPPort, kubeNodePortLocalSetSCTPComment},
{kubeHealthCheckNodePortSet, utilipset.BitmapPort, kubeHealthCheckNodePortSetComment},
{kubeIPVSSet, utilipset.HashIP, kubeIPVSSetComment},
}
// ipsetWithIptablesChain is the ipsets list with iptables source chain and the chain jump to
// `iptables -t nat -A <from> -m set --match-set <name> <matchType> -j <to>`
// example: iptables -t nat -A KUBE-SERVICES -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-NODE-PORT
// ipsets with other match rules will be created Individually.
// Note: kubeNodePortLocalSetTCP must be prior to kubeNodePortSetTCP, the same for UDP.
var ipsetWithIptablesChain = []struct {
name string
table utiliptables.Table
from string
to string
matchType string
protocolMatch string
}{
{kubeLoopBackIPSet, utiliptables.TableNAT, string(kubePostroutingChain), "MASQUERADE", "dst,dst,src", ""},
{kubeLoadBalancerSet, utiliptables.TableNAT, string(kubeServicesChain), string(kubeLoadBalancerChain), "dst,dst", ""},
{kubeLoadBalancerLocalSet, utiliptables.TableNAT, string(kubeLoadBalancerChain), "RETURN", "dst,dst", ""},
{kubeNodePortLocalSetTCP, utiliptables.TableNAT, string(kubeNodePortChain), "RETURN", "dst", utilipset.ProtocolTCP},
{kubeNodePortSetTCP, utiliptables.TableNAT, string(kubeNodePortChain), string(kubeMarkMasqChain), "dst", utilipset.ProtocolTCP},
{kubeNodePortLocalSetUDP, utiliptables.TableNAT, string(kubeNodePortChain), "RETURN", "dst", utilipset.ProtocolUDP},
{kubeNodePortSetUDP, utiliptables.TableNAT, string(kubeNodePortChain), string(kubeMarkMasqChain), "dst", utilipset.ProtocolUDP},
{kubeNodePortLocalSetSCTP, utiliptables.TableNAT, string(kubeNodePortChain), "RETURN", "dst,dst", utilipset.ProtocolSCTP},
{kubeNodePortSetSCTP, utiliptables.TableNAT, string(kubeNodePortChain), string(kubeMarkMasqChain), "dst,dst", utilipset.ProtocolSCTP},
{kubeLoadBalancerFWSet, utiliptables.TableFilter, string(kubeProxyFirewallChain), string(kubeSourceRangesFirewallChain), "dst,dst", ""},
{kubeLoadBalancerSourceCIDRSet, utiliptables.TableFilter, string(kubeSourceRangesFirewallChain), "RETURN", "dst,dst,src", ""},
{kubeLoadBalancerSourceIPSet, utiliptables.TableFilter, string(kubeSourceRangesFirewallChain), "RETURN", "dst,dst,src", ""},
}
// internal struct for string service information // internal struct for string service information
type servicePortInfo struct { type servicePortInfo struct {
*proxy.BaseServicePortInfo *proxy.BaseServicePortInfo

View File

@ -103,51 +103,39 @@ const (
masqueradingChain = "masquerading" masqueradingChain = "masquerading"
) )
// internal struct for string service information // NewDualStackProxier creates a MetaProxier instance, with IPv4 and IPv6 proxies.
type servicePortInfo struct { func NewDualStackProxier(
*proxy.BaseServicePortInfo sysctl utilsysctl.Interface,
// The following fields are computed and stored for performance reasons. syncPeriod time.Duration,
nameString string minSyncPeriod time.Duration,
clusterPolicyChainName string masqueradeAll bool,
localPolicyChainName string masqueradeBit int,
externalChainName string localDetectors [2]proxyutiliptables.LocalTrafficDetector,
firewallChainName string hostname string,
} nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder,
// returns a new proxy.ServicePort which abstracts a serviceInfo healthzServer *healthcheck.ProxierHealthServer,
func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort { nodePortAddresses []string,
svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo} initOnly bool,
) (proxy.Provider, error) {
// Store the following for performance reasons. // Create an ipv4 instance of the single-stack proxier
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} ipv4Proxier, err := NewProxier(v1.IPv4Protocol, sysctl,
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[0], hostname,
svcPort.nameString = svcPortName.String() nodeIPs[v1.IPv4Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
if err != nil {
chainNameBase := servicePortChainNameBase(&svcPortName, strings.ToLower(string(svcPort.Protocol()))) return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
svcPort.clusterPolicyChainName = servicePortPolicyClusterChainNamePrefix + chainNameBase
svcPort.localPolicyChainName = servicePortPolicyLocalChainNamePrefix + chainNameBase
svcPort.externalChainName = serviceExternalChainNamePrefix + chainNameBase
svcPort.firewallChainName = servicePortFirewallChainNamePrefix + chainNameBase
return svcPort
}
// internal struct for endpoints information
type endpointInfo struct {
*proxy.BaseEndpointInfo
chainName string
affinitySetName string
}
// returns a new proxy.Endpoint which abstracts a endpointInfo
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
chainNameBase := servicePortEndpointChainNameBase(svcPortName, strings.ToLower(string(svcPortName.Protocol)), baseInfo.String())
return &endpointInfo{
BaseEndpointInfo: baseInfo,
chainName: servicePortEndpointChainNamePrefix + chainNameBase,
affinitySetName: servicePortEndpointAffinityNamePrefix + chainNameBase,
} }
ipv6Proxier, err := NewProxier(v1.IPv6Protocol, sysctl,
syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[1], hostname,
nodeIPs[v1.IPv6Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err)
}
if initOnly {
return nil, nil
}
return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
} }
// Proxier is an nftables based proxy // Proxier is an nftables based proxy
@ -276,39 +264,51 @@ func NewProxier(ipFamily v1.IPFamily,
return proxier, nil return proxier, nil
} }
// NewDualStackProxier creates a MetaProxier instance, with IPv4 and IPv6 proxies. // internal struct for string service information
func NewDualStackProxier( type servicePortInfo struct {
sysctl utilsysctl.Interface, *proxy.BaseServicePortInfo
syncPeriod time.Duration, // The following fields are computed and stored for performance reasons.
minSyncPeriod time.Duration, nameString string
masqueradeAll bool, clusterPolicyChainName string
masqueradeBit int, localPolicyChainName string
localDetectors [2]proxyutiliptables.LocalTrafficDetector, externalChainName string
hostname string, firewallChainName string
nodeIPs map[v1.IPFamily]net.IP, }
recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer,
nodePortAddresses []string,
initOnly bool,
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(v1.IPv4Protocol, sysctl,
syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[0], hostname,
nodeIPs[v1.IPv4Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
}
ipv6Proxier, err := NewProxier(v1.IPv6Protocol, sysctl, // returns a new proxy.ServicePort which abstracts a serviceInfo
syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[1], hostname, func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
nodeIPs[v1.IPv6Protocol], recorder, healthzServer, nodePortAddresses, initOnly) svcPort := &servicePortInfo{BaseServicePortInfo: bsvcPortInfo}
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err) // Store the following for performance reasons.
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
svcPort.nameString = svcPortName.String()
chainNameBase := servicePortChainNameBase(&svcPortName, strings.ToLower(string(svcPort.Protocol())))
svcPort.clusterPolicyChainName = servicePortPolicyClusterChainNamePrefix + chainNameBase
svcPort.localPolicyChainName = servicePortPolicyLocalChainNamePrefix + chainNameBase
svcPort.externalChainName = serviceExternalChainNamePrefix + chainNameBase
svcPort.firewallChainName = servicePortFirewallChainNamePrefix + chainNameBase
return svcPort
}
// internal struct for endpoints information
type endpointInfo struct {
*proxy.BaseEndpointInfo
chainName string
affinitySetName string
}
// returns a new proxy.Endpoint which abstracts a endpointInfo
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
chainNameBase := servicePortEndpointChainNameBase(svcPortName, strings.ToLower(string(svcPortName.Protocol)), baseInfo.String())
return &endpointInfo{
BaseEndpointInfo: baseInfo,
chainName: servicePortEndpointChainNamePrefix + chainNameBase,
affinitySetName: servicePortEndpointAffinityNamePrefix + chainNameBase,
} }
if initOnly {
return nil, nil
}
return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
} }
// nftablesBaseChains lists our "base chains"; those that are directly connected to the // nftablesBaseChains lists our "base chains"; those that are directly connected to the

View File

@ -596,10 +596,9 @@ type Proxier struct {
initialized int32 initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
// These are effectively const and do not need the mutex to be held. // These are effectively const and do not need the mutex to be held.
clusterCIDR string hostname string
hostname string nodeIP net.IP
nodeIP net.IP recorder events.EventRecorder
recorder events.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer serviceHealthServer healthcheck.ServiceHealthServer
healthzServer *healthcheck.ProxierHealthServer healthzServer *healthcheck.ProxierHealthServer
@ -654,27 +653,28 @@ func NewProxier(
ipFamily v1.IPFamily, ipFamily v1.IPFamily,
syncPeriod time.Duration, syncPeriod time.Duration,
minSyncPeriod time.Duration, minSyncPeriod time.Duration,
clusterCIDR string,
hostname string, hostname string,
nodeIP net.IP, nodeIP net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer, healthzServer *healthcheck.ProxierHealthServer,
healthzBindAddress string,
config config.KubeProxyWinkernelConfiguration, config config.KubeProxyWinkernelConfiguration,
healthzPort int,
) (*Proxier, error) { ) (*Proxier, error) {
if nodeIP == nil { if nodeIP == nil {
klog.InfoS("Invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP") klog.InfoS("Invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
nodeIP = netutils.ParseIPSloppy("127.0.0.1") nodeIP = netutils.ParseIPSloppy("127.0.0.1")
} }
if len(clusterCIDR) == 0 {
klog.InfoS("ClusterCIDR not specified, unable to distinguish between internal and external traffic")
}
// windows listens to all node addresses // windows listens to all node addresses
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil, nil) nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil, nil)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
var healthzPort int
if len(healthzBindAddress) > 0 {
_, port, _ := net.SplitHostPort(healthzBindAddress)
healthzPort, _ = strconv.Atoi(port)
}
hcnImpl := newHcnImpl() hcnImpl := newHcnImpl()
hns, supportedFeatures := newHostNetworkService(hcnImpl) hns, supportedFeatures := newHostNetworkService(hcnImpl)
hnsNetworkName, err := getNetworkName(config.NetworkName) hnsNetworkName, err := getNetworkName(config.NetworkName)
@ -757,7 +757,6 @@ func NewProxier(
endPointsRefCount: make(endPointsReferenceCountMap), endPointsRefCount: make(endPointsReferenceCountMap),
svcPortMap: make(proxy.ServicePortMap), svcPortMap: make(proxy.ServicePortMap),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
clusterCIDR: clusterCIDR,
hostname: hostname, hostname: hostname,
nodeIP: nodeIP, nodeIP: nodeIP,
recorder: recorder, recorder: recorder,
@ -790,29 +789,28 @@ func NewProxier(
func NewDualStackProxier( func NewDualStackProxier(
syncPeriod time.Duration, syncPeriod time.Duration,
minSyncPeriod time.Duration, minSyncPeriod time.Duration,
clusterCIDR string,
hostname string, hostname string,
nodeIPs map[v1.IPFamily]net.IP, nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer *healthcheck.ProxierHealthServer, healthzServer *healthcheck.ProxierHealthServer,
healthzBindAddress string,
config config.KubeProxyWinkernelConfiguration, config config.KubeProxyWinkernelConfiguration,
healthzPort int,
) (proxy.Provider, error) { ) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier // Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(v1.IPv4Protocol, syncPeriod, minSyncPeriod, ipv4Proxier, err := NewProxier(v1.IPv4Protocol, syncPeriod, minSyncPeriod,
clusterCIDR, hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer, hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer,
config, healthzPort) healthzBindAddress, config)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIPs[v1.IPv4Protocol]) return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, nodeIP:%v", err, hostname, nodeIPs[v1.IPv4Protocol])
} }
ipv6Proxier, err := NewProxier(v1.IPv6Protocol, syncPeriod, minSyncPeriod, ipv6Proxier, err := NewProxier(v1.IPv6Protocol, syncPeriod, minSyncPeriod,
clusterCIDR, hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer, hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer,
config, healthzPort) healthzBindAddress, config)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIPs[v1.IPv6Protocol]) return nil, fmt.Errorf("unable to create ipv6 proxier: %v, hostname: %s, nodeIP:%v", err, hostname, nodeIPs[v1.IPv6Protocol])
} }
// Return a meta-proxier that dispatch calls between the two // Return a meta-proxier that dispatch calls between the two

View File

@ -46,7 +46,6 @@ const (
ipAddress = "10.0.0.1" ipAddress = "10.0.0.1"
prefixLen = 24 prefixLen = 24
macAddress = "00-11-22-33-44-55" macAddress = "00-11-22-33-44-55"
clusterCIDR = "192.168.1.0/24"
destinationPrefix = "192.168.2.0/24" destinationPrefix = "192.168.2.0/24"
providerAddress = "10.0.0.3" providerAddress = "10.0.0.3"
guid = "123ABC" guid = "123ABC"
@ -84,7 +83,7 @@ func newHnsNetwork(networkInfo *hnsNetworkInfo) *hcn.HostComputeNetwork {
return network return network
} }
func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string) *Proxier { func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, hostname string, nodeIP net.IP, networkType string) *Proxier {
sourceVip := "192.168.1.2" sourceVip := "192.168.1.2"
var remoteSubnets []*remoteSubnetInfo var remoteSubnets []*remoteSubnetInfo
rs := &remoteSubnetInfo{ rs := &remoteSubnetInfo{
@ -105,7 +104,6 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
proxier := &Proxier{ proxier := &Proxier{
svcPortMap: make(proxy.ServicePortMap), svcPortMap: make(proxy.ServicePortMap),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
clusterCIDR: clusterCIDR,
hostname: testHostName, hostname: testHostName,
nodeIP: nodeIP, nodeIP: nodeIP,
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
@ -132,7 +130,7 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
func TestCreateServiceVip(t *testing.T) { func TestCreateServiceVip(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -186,7 +184,7 @@ func TestCreateServiceVip(t *testing.T) {
func TestCreateRemoteEndpointOverlay(t *testing.T) { func TestCreateRemoteEndpointOverlay(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -251,7 +249,7 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
func TestCreateRemoteEndpointL2Bridge(t *testing.T) { func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", netutils.ParseIPSloppy("10.0.0.1"), "L2Bridge") proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), "L2Bridge")
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -313,7 +311,7 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
} }
func TestSharedRemoteEndpointDelete(t *testing.T) { func TestSharedRemoteEndpointDelete(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", netutils.ParseIPSloppy("10.0.0.1"), "L2Bridge") proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), "L2Bridge")
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -455,7 +453,7 @@ func TestSharedRemoteEndpointDelete(t *testing.T) {
} }
func TestSharedRemoteEndpointUpdate(t *testing.T) { func TestSharedRemoteEndpointUpdate(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", netutils.ParseIPSloppy("10.0.0.1"), "L2Bridge") proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), "L2Bridge")
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -629,7 +627,7 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) {
} }
func TestCreateLoadBalancer(t *testing.T) { func TestCreateLoadBalancer(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -686,7 +684,7 @@ func TestCreateLoadBalancer(t *testing.T) {
func TestCreateDsrLoadBalancer(t *testing.T) { func TestCreateDsrLoadBalancer(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -764,7 +762,7 @@ func TestCreateDsrLoadBalancer(t *testing.T) {
// loadbalancers will be created. // loadbalancers will be created.
func TestClusterIPLBInCreateDsrLoadBalancer(t *testing.T) { func TestClusterIPLBInCreateDsrLoadBalancer(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
@ -845,7 +843,7 @@ func TestClusterIPLBInCreateDsrLoadBalancer(t *testing.T) {
func TestEndpointSlice(t *testing.T) { func TestEndpointSlice(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -925,7 +923,7 @@ func TestNoopEndpointSlice(t *testing.T) {
func TestFindRemoteSubnetProviderAddress(t *testing.T) { func TestFindRemoteSubnetProviderAddress(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }