pkg/proxy: add an ipFamily field to the winkernel proxier

Signed-off-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
Daman Arora 2023-10-15 19:51:03 +05:30
parent 4e10ff91c5
commit 4ea6ec738c
2 changed files with 11 additions and 14 deletions

View File

@ -96,6 +96,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
) )
} else { } else {
proxier, err = winkernel.NewProxier( proxier, err = winkernel.NewProxier(
s.PrimaryIPFamily,
config.IPTables.SyncPeriod.Duration, config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration,
config.ClusterCIDR, config.ClusterCIDR,

View File

@ -590,6 +590,8 @@ type endPointsReferenceCountMap map[string]*uint16
// Proxier is an hns based proxy for connections between a localhost:lport // Proxier is an hns based proxy for connections between a localhost:lport
// and services that provide the actual backends. // and services that provide the actual backends.
type Proxier struct { type Proxier struct {
// ipFamily defines the IP family which this proxier is tracking.
ipFamily v1.IPFamily
// TODO(imroc): implement node handler for winkernel proxier. // TODO(imroc): implement node handler for winkernel proxier.
proxyconfig.NoopNodeHandler proxyconfig.NoopNodeHandler
@ -608,7 +610,6 @@ type Proxier struct {
// with some partial data after kube-proxy restart. // with some partial data after kube-proxy restart.
endpointSlicesSynced bool endpointSlicesSynced bool
servicesSynced bool servicesSynced bool
isIPv6Mode bool
initialized int32 initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
// These are effectively const and do not need the mutex to be held. // These are effectively const and do not need the mutex to be held.
@ -666,6 +667,7 @@ var _ proxy.Provider = &Proxier{}
// NewProxier returns a new Proxier // NewProxier returns a new Proxier
func NewProxier( func NewProxier(
ipFamily v1.IPFamily,
syncPeriod time.Duration, syncPeriod time.Duration,
minSyncPeriod time.Duration, minSyncPeriod time.Duration,
clusterCIDR string, clusterCIDR string,
@ -685,12 +687,6 @@ func NewProxier(
klog.InfoS("ClusterCIDR not specified, unable to distinguish between internal and external traffic") klog.InfoS("ClusterCIDR not specified, unable to distinguish between internal and external traffic")
} }
isIPv6 := netutils.IsIPv6(nodeIP)
ipFamily := v1.IPv4Protocol
if isIPv6 {
ipFamily = v1.IPv6Protocol
}
// windows listens to all node addresses // windows listens to all node addresses
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil) nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
@ -771,6 +767,7 @@ func NewProxier(
} }
proxier := &Proxier{ proxier := &Proxier{
ipFamily: ipFamily,
endPointsRefCount: make(endPointsReferenceCountMap), endPointsRefCount: make(endPointsReferenceCountMap),
svcPortMap: make(proxy.ServicePortMap), svcPortMap: make(proxy.ServicePortMap),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
@ -786,7 +783,6 @@ func NewProxier(
hostMac: hostMac, hostMac: hostMac,
isDSR: isDSR, isDSR: isDSR,
supportedFeatures: supportedFeatures, supportedFeatures: supportedFeatures,
isIPv6Mode: isIPv6,
healthzPort: healthzPort, healthzPort: healthzPort,
rootHnsEndpointName: config.RootHnsEndpointName, rootHnsEndpointName: config.RootHnsEndpointName,
forwardHealthCheckVip: config.ForwardHealthCheckVip, forwardHealthCheckVip: config.ForwardHealthCheckVip,
@ -817,7 +813,7 @@ func NewDualStackProxier(
) (proxy.Provider, error) { ) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier // Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(syncPeriod, minSyncPeriod, ipv4Proxier, err := NewProxier(v1.IPv4Protocol, syncPeriod, minSyncPeriod,
clusterCIDR, hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer, clusterCIDR, hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer,
config, healthzPort) config, healthzPort)
@ -825,7 +821,7 @@ func NewDualStackProxier(
return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIPs[v1.IPv4Protocol]) return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIPs[v1.IPv4Protocol])
} }
ipv6Proxier, err := NewProxier(syncPeriod, minSyncPeriod, ipv6Proxier, err := NewProxier(v1.IPv6Protocol, syncPeriod, minSyncPeriod,
clusterCIDR, hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer, clusterCIDR, hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer,
config, healthzPort) config, healthzPort)
if err != nil { if err != nil {
@ -1455,7 +1451,7 @@ func (proxier *Proxier) syncProxyRules() {
// Cluster IP LoadBalancer creation // Cluster IP LoadBalancer creation
hnsLoadBalancer, err := hns.getLoadBalancer( hnsLoadBalancer, err := hns.getLoadBalancer(
clusterIPEndpoints, clusterIPEndpoints,
loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.isIPv6Mode, sessionAffinity: sessionAffinityClientIP}, loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP},
sourceVip, sourceVip,
svcInfo.ClusterIP().String(), svcInfo.ClusterIP().String(),
Enum(svcInfo.Protocol()), Enum(svcInfo.Protocol()),
@ -1490,7 +1486,7 @@ func (proxier *Proxier) syncProxyRules() {
// If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer // If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer
hnsLoadBalancer, err := hns.getLoadBalancer( hnsLoadBalancer, err := hns.getLoadBalancer(
nodePortEndpoints, nodePortEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
sourceVip, sourceVip,
"", "",
Enum(svcInfo.Protocol()), Enum(svcInfo.Protocol()),
@ -1525,7 +1521,7 @@ func (proxier *Proxier) syncProxyRules() {
// Try loading existing policies, if already available // Try loading existing policies, if already available
hnsLoadBalancer, err = hns.getLoadBalancer( hnsLoadBalancer, err = hns.getLoadBalancer(
externalIPEndpoints, externalIPEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
sourceVip, sourceVip,
externalIP.ip, externalIP.ip,
Enum(svcInfo.Protocol()), Enum(svcInfo.Protocol()),
@ -1556,7 +1552,7 @@ func (proxier *Proxier) syncProxyRules() {
if len(lbIngressEndpoints) > 0 { if len(lbIngressEndpoints) > 0 {
hnsLoadBalancer, err := hns.getLoadBalancer( hnsLoadBalancer, err := hns.getLoadBalancer(
lbIngressEndpoints, lbIngressEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
sourceVip, sourceVip,
lbIngressIP.ip, lbIngressIP.ip,
Enum(svcInfo.Protocol()), Enum(svcInfo.Protocol()),