mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Merge pull request #52014 from m1093782566/reasons-sync
Automatic merge from submit-queue (batch tested with PRs 50294, 50422, 51757, 52379, 52014). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.. rsync ipvs proxier to HEAD of iptables **What this PR does / why we need it**: rsync ipvs proxier to HEAD of iptables. **Which issue this PR fixes**: xref #51679 **Special notes for your reviewer**: Obviously, @Lion-Wei has done part of this work, ref: #51922. It's fine that let #51922 get in first. **Release note**: ```release-note NONE ```
This commit is contained in:
commit
7008b9043b
@ -51,6 +51,7 @@ go_library(
|
|||||||
"//pkg/proxy:go_default_library",
|
"//pkg/proxy:go_default_library",
|
||||||
"//pkg/proxy/healthcheck:go_default_library",
|
"//pkg/proxy/healthcheck:go_default_library",
|
||||||
"//pkg/proxy/util:go_default_library",
|
"//pkg/proxy/util:go_default_library",
|
||||||
|
"//pkg/util/async:go_default_library",
|
||||||
"//pkg/util/iptables:go_default_library",
|
"//pkg/util/iptables:go_default_library",
|
||||||
"//pkg/util/ipvs:go_default_library",
|
"//pkg/util/ipvs:go_default_library",
|
||||||
"//pkg/util/sysctl:go_default_library",
|
"//pkg/util/sysctl:go_default_library",
|
||||||
@ -58,9 +59,9 @@ go_library(
|
|||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
|
|
||||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -35,9 +36,9 @@ import (
|
|||||||
clientv1 "k8s.io/api/core/v1"
|
clientv1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
"k8s.io/client-go/util/flowcontrol"
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/helper"
|
"k8s.io/kubernetes/pkg/api/helper"
|
||||||
apiservice "k8s.io/kubernetes/pkg/api/service"
|
apiservice "k8s.io/kubernetes/pkg/api/service"
|
||||||
@ -45,6 +46,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/proxy"
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||||
|
"k8s.io/kubernetes/pkg/util/async"
|
||||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||||
utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
|
utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
|
||||||
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
||||||
@ -105,8 +107,8 @@ type Proxier struct {
|
|||||||
// with some partial data after kube-proxy restart.
|
// with some partial data after kube-proxy restart.
|
||||||
endpointsSynced bool
|
endpointsSynced bool
|
||||||
servicesSynced bool
|
servicesSynced bool
|
||||||
|
initialized int32
|
||||||
throttle flowcontrol.RateLimiter
|
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
|
||||||
|
|
||||||
// These are effectively const and do not need the mutex to be held.
|
// These are effectively const and do not need the mutex to be held.
|
||||||
syncPeriod time.Duration
|
syncPeriod time.Duration
|
||||||
@ -242,23 +244,14 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface,
|
|||||||
|
|
||||||
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
|
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
|
||||||
|
|
||||||
var throttle flowcontrol.RateLimiter
|
proxier := &Proxier{
|
||||||
// Defaulting back to not limit sync rate when minSyncPeriod is 0.
|
|
||||||
if minSyncPeriod != 0 {
|
|
||||||
syncsPerSecond := float32(time.Second) / float32(minSyncPeriod)
|
|
||||||
// The average use case will process 2 updates in short succession
|
|
||||||
throttle = flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Proxier{
|
|
||||||
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
||||||
serviceMap: make(proxyServiceMap),
|
serviceMap: make(proxyServiceMap),
|
||||||
serviceChanges: newServiceChangeMap(),
|
serviceChanges: newServiceChangeMap(),
|
||||||
endpointsMap: make(proxyEndpointsMap),
|
endpointsMap: make(proxyEndpointsMap),
|
||||||
endpointsChanges: newEndpointsChangeMap(),
|
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||||
syncPeriod: syncPeriod,
|
syncPeriod: syncPeriod,
|
||||||
minSyncPeriod: minSyncPeriod,
|
minSyncPeriod: minSyncPeriod,
|
||||||
throttle: throttle,
|
|
||||||
iptables: ipt,
|
iptables: ipt,
|
||||||
masqueradeAll: masqueradeAll,
|
masqueradeAll: masqueradeAll,
|
||||||
masqueradeMark: masqueradeMark,
|
masqueradeMark: masqueradeMark,
|
||||||
@ -276,7 +269,11 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface,
|
|||||||
iptablesData: bytes.NewBuffer(nil),
|
iptablesData: bytes.NewBuffer(nil),
|
||||||
natChains: bytes.NewBuffer(nil),
|
natChains: bytes.NewBuffer(nil),
|
||||||
natRules: bytes.NewBuffer(nil),
|
natRules: bytes.NewBuffer(nil),
|
||||||
}, nil
|
}
|
||||||
|
burstSyncs := 2
|
||||||
|
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
|
||||||
|
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
|
||||||
|
return proxier, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
|
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
|
||||||
@ -294,37 +291,41 @@ type serviceInfo struct {
|
|||||||
loadBalancerSourceRanges []string
|
loadBalancerSourceRanges []string
|
||||||
onlyNodeLocalEndpoints bool
|
onlyNodeLocalEndpoints bool
|
||||||
healthCheckNodePort int
|
healthCheckNodePort int
|
||||||
|
// The following fields are computed and stored for performance reasons.
|
||||||
|
serviceNameString string
|
||||||
}
|
}
|
||||||
|
|
||||||
// <serviceMap> is updated by this function (based on the given changes).
|
// <serviceMap> is updated by this function (based on the given changes).
|
||||||
// <changes> map is cleared after applying them.
|
// <changes> map is cleared after applying them.
|
||||||
func updateServiceMap(
|
func updateServiceMap(
|
||||||
serviceMap proxyServiceMap,
|
serviceMap proxyServiceMap,
|
||||||
changes *serviceChangeMap) (syncRequired bool, hcServices map[types.NamespacedName]uint16, staleServices sets.String) {
|
changes *serviceChangeMap) (result updateServiceMapResult) {
|
||||||
syncRequired = false
|
result.staleServices = sets.NewString()
|
||||||
staleServices = sets.NewString()
|
|
||||||
|
|
||||||
for _, change := range changes.items {
|
func() {
|
||||||
mergeSyncRequired, existingPorts := serviceMap.mergeService(change.current)
|
changes.lock.Lock()
|
||||||
unmergeSyncRequired := serviceMap.unmergeService(change.previous, existingPorts, staleServices)
|
defer changes.lock.Unlock()
|
||||||
syncRequired = syncRequired || mergeSyncRequired || unmergeSyncRequired
|
for _, change := range changes.items {
|
||||||
}
|
existingPorts := serviceMap.merge(change.current)
|
||||||
changes.items = make(map[types.NamespacedName]*serviceChange)
|
serviceMap.unmerge(change.previous, existingPorts, result.staleServices)
|
||||||
|
}
|
||||||
|
changes.items = make(map[types.NamespacedName]*serviceChange)
|
||||||
|
}()
|
||||||
|
|
||||||
// TODO: If this will appear to be computationally expensive, consider
|
// TODO: If this will appear to be computationally expensive, consider
|
||||||
// computing this incrementally similarly to serviceMap.
|
// computing this incrementally similarly to serviceMap.
|
||||||
hcServices = make(map[types.NamespacedName]uint16)
|
result.hcServices = make(map[types.NamespacedName]uint16)
|
||||||
for svcPort, info := range serviceMap {
|
for svcPortName, info := range serviceMap {
|
||||||
if info.healthCheckNodePort != 0 {
|
if info.healthCheckNodePort != 0 {
|
||||||
hcServices[svcPort.NamespacedName] = uint16(info.healthCheckNodePort)
|
result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return syncRequired, hcServices, staleServices
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns a new serviceInfo struct
|
// returns a new serviceInfo struct
|
||||||
func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
|
func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
|
||||||
onlyNodeLocalEndpoints := false
|
onlyNodeLocalEndpoints := false
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) &&
|
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) &&
|
||||||
apiservice.RequestsOnlyLocalTraffic(service) {
|
apiservice.RequestsOnlyLocalTraffic(service) {
|
||||||
@ -347,90 +348,77 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
|
|||||||
loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
|
loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
|
||||||
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
|
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
|
||||||
}
|
}
|
||||||
|
|
||||||
copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
|
copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
|
||||||
copy(info.externalIPs, service.Spec.ExternalIPs)
|
copy(info.externalIPs, service.Spec.ExternalIPs)
|
||||||
|
|
||||||
if apiservice.NeedsHealthCheck(service) {
|
if apiservice.NeedsHealthCheck(service) {
|
||||||
p := service.Spec.HealthCheckNodePort
|
p := service.Spec.HealthCheckNodePort
|
||||||
if p == 0 {
|
if p == 0 {
|
||||||
glog.Errorf("Service %q has no healthcheck nodeport", serviceName)
|
glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String())
|
||||||
} else {
|
} else {
|
||||||
info.healthCheckNodePort = int(p)
|
info.healthCheckNodePort = int(p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Store the following for performance reasons.
|
||||||
|
info.serviceNameString = svcPortName.String()
|
||||||
|
|
||||||
return info
|
return info
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String) {
|
func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
|
||||||
if service == nil {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
|
||||||
if utilproxy.ShouldSkipService(svcName, service) {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
syncRequired := false
|
|
||||||
existingPorts := sets.NewString()
|
existingPorts := sets.NewString()
|
||||||
for i := range service.Spec.Ports {
|
for svcPortName, info := range other {
|
||||||
servicePort := &service.Spec.Ports[i]
|
existingPorts.Insert(svcPortName.Port)
|
||||||
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
|
_, exists := (*sm)[svcPortName]
|
||||||
existingPorts.Insert(servicePort.Name)
|
if !exists {
|
||||||
info := newServiceInfo(serviceName, servicePort, service)
|
glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
|
||||||
oldInfo, exists := (*sm)[serviceName]
|
} else {
|
||||||
equal := reflect.DeepEqual(info, oldInfo)
|
glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
|
||||||
if exists {
|
|
||||||
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
|
|
||||||
} else if !equal {
|
|
||||||
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
|
|
||||||
}
|
|
||||||
if !equal {
|
|
||||||
(*sm)[serviceName] = info
|
|
||||||
syncRequired = true
|
|
||||||
}
|
}
|
||||||
|
(*sm)[svcPortName] = info
|
||||||
}
|
}
|
||||||
return syncRequired, existingPorts
|
return existingPorts
|
||||||
}
|
}
|
||||||
|
|
||||||
// <staleServices> are modified by this function with detected stale services.
|
func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) {
|
||||||
func (sm *proxyServiceMap) unmergeService(service *api.Service, existingPorts, staleServices sets.String) bool {
|
for svcPortName := range other {
|
||||||
if service == nil {
|
if existingPorts.Has(svcPortName.Port) {
|
||||||
return false
|
|
||||||
}
|
|
||||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
|
||||||
if utilproxy.ShouldSkipService(svcName, service) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
syncRequired := false
|
|
||||||
for i := range service.Spec.Ports {
|
|
||||||
servicePort := &service.Spec.Ports[i]
|
|
||||||
if existingPorts.Has(servicePort.Name) {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
|
info, exists := (*sm)[svcPortName]
|
||||||
info, exists := (*sm)[serviceName]
|
|
||||||
if exists {
|
if exists {
|
||||||
glog.V(1).Infof("Removing service %q", serviceName)
|
glog.V(1).Infof("Removing service port %q", svcPortName)
|
||||||
if info.protocol == api.ProtocolUDP {
|
if info.protocol == api.ProtocolUDP {
|
||||||
staleServices.Insert(info.clusterIP.String())
|
staleServices.Insert(info.clusterIP.String())
|
||||||
}
|
}
|
||||||
delete(*sm, serviceName)
|
delete(*sm, svcPortName)
|
||||||
syncRequired = true
|
|
||||||
} else {
|
} else {
|
||||||
glog.Errorf("Service %q removed, but doesn't exists", serviceName)
|
glog.Errorf("Service port %q removed, but doesn't exists", svcPortName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return syncRequired
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type serviceChangeMap struct {
|
type serviceChangeMap struct {
|
||||||
sync.Mutex
|
lock sync.Mutex
|
||||||
items map[types.NamespacedName]*serviceChange
|
items map[types.NamespacedName]*serviceChange
|
||||||
}
|
}
|
||||||
|
|
||||||
type serviceChange struct {
|
type serviceChange struct {
|
||||||
previous *api.Service
|
previous proxyServiceMap
|
||||||
current *api.Service
|
current proxyServiceMap
|
||||||
|
}
|
||||||
|
|
||||||
|
type updateEndpointMapResult struct {
|
||||||
|
hcEndpoints map[types.NamespacedName]int
|
||||||
|
staleEndpoints map[endpointServicePair]bool
|
||||||
|
staleServiceNames map[proxy.ServicePortName]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type updateServiceMapResult struct {
|
||||||
|
hcServices map[types.NamespacedName]uint16
|
||||||
|
staleServices sets.String
|
||||||
}
|
}
|
||||||
|
|
||||||
func newServiceChangeMap() serviceChangeMap {
|
func newServiceChangeMap() serviceChangeMap {
|
||||||
@ -439,17 +427,42 @@ func newServiceChangeMap() serviceChangeMap {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) {
|
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool {
|
||||||
scm.Lock()
|
scm.lock.Lock()
|
||||||
defer scm.Unlock()
|
defer scm.lock.Unlock()
|
||||||
|
|
||||||
change, exists := scm.items[*namespacedName]
|
change, exists := scm.items[*namespacedName]
|
||||||
if !exists {
|
if !exists {
|
||||||
change = &serviceChange{}
|
change = &serviceChange{}
|
||||||
change.previous = previous
|
change.previous = serviceToServiceMap(previous)
|
||||||
scm.items[*namespacedName] = change
|
scm.items[*namespacedName] = change
|
||||||
}
|
}
|
||||||
change.current = current
|
change.current = serviceToServiceMap(current)
|
||||||
|
if reflect.DeepEqual(change.previous, change.current) {
|
||||||
|
delete(scm.items, *namespacedName)
|
||||||
|
}
|
||||||
|
return len(scm.items) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Translates single Service object to proxyServiceMap.
|
||||||
|
//
|
||||||
|
// NOTE: service object should NOT be modified.
|
||||||
|
func serviceToServiceMap(service *api.Service) proxyServiceMap {
|
||||||
|
if service == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
|
if utilproxy.ShouldSkipService(svcName, service) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceMap := make(proxyServiceMap)
|
||||||
|
for i := range service.Spec.Ports {
|
||||||
|
servicePort := &service.Spec.Ports[i]
|
||||||
|
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
|
||||||
|
serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service)
|
||||||
|
}
|
||||||
|
return serviceMap
|
||||||
}
|
}
|
||||||
|
|
||||||
// internal struct for endpoints information
|
// internal struct for endpoints information
|
||||||
@ -462,6 +475,14 @@ func (e *endpointsInfo) String() string {
|
|||||||
return fmt.Sprintf("%v", *e)
|
return fmt.Sprintf("%v", *e)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IPPart returns just the IP part of the endpoint.
|
||||||
|
func (e *endpointsInfo) IPPart() string {
|
||||||
|
if index := strings.Index(e.endpoint, ":"); index != -1 {
|
||||||
|
return e.endpoint[0:index]
|
||||||
|
}
|
||||||
|
return e.endpoint
|
||||||
|
}
|
||||||
|
|
||||||
type endpointServicePair struct {
|
type endpointServicePair struct {
|
||||||
endpoint string
|
endpoint string
|
||||||
servicePortName proxy.ServicePortName
|
servicePortName proxy.ServicePortName
|
||||||
@ -470,33 +491,40 @@ type endpointServicePair struct {
|
|||||||
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
|
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
|
||||||
|
|
||||||
type endpointsChange struct {
|
type endpointsChange struct {
|
||||||
previous *api.Endpoints
|
previous proxyEndpointsMap
|
||||||
current *api.Endpoints
|
current proxyEndpointsMap
|
||||||
}
|
}
|
||||||
|
|
||||||
type endpointsChangeMap struct {
|
type endpointsChangeMap struct {
|
||||||
sync.Mutex
|
lock sync.Mutex
|
||||||
items map[types.NamespacedName]*endpointsChange
|
hostname string
|
||||||
|
items map[types.NamespacedName]*endpointsChange
|
||||||
}
|
}
|
||||||
|
|
||||||
// <staleEndpoints> are modified by this function with detected stale
|
// <staleEndpoints> and <staleServices> are modified by this function with detected stale connections.
|
||||||
// connections.
|
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) {
|
||||||
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) {
|
for svcPortName, epList := range oldEndpointsMap {
|
||||||
for svcPort, epList := range oldEndpointsMap {
|
|
||||||
for _, ep := range epList {
|
for _, ep := range epList {
|
||||||
stale := true
|
stale := true
|
||||||
for i := range newEndpointsMap[svcPort] {
|
for i := range newEndpointsMap[svcPortName] {
|
||||||
if *newEndpointsMap[svcPort][i] == *ep {
|
if *newEndpointsMap[svcPortName][i] == *ep {
|
||||||
stale = false
|
stale = false
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if stale {
|
if stale {
|
||||||
glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint)
|
glog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.endpoint)
|
||||||
staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
|
staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPortName}] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for svcPortName, epList := range newEndpointsMap {
|
||||||
|
// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
|
||||||
|
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
|
||||||
|
staleServiceNames[svcPortName] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// <endpointsMap> is updated by this function (based on the given changes).
|
// <endpointsMap> is updated by this function (based on the given changes).
|
||||||
@ -504,20 +532,20 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap,
|
|||||||
func updateEndpointsMap(
|
func updateEndpointsMap(
|
||||||
endpointsMap proxyEndpointsMap,
|
endpointsMap proxyEndpointsMap,
|
||||||
changes *endpointsChangeMap,
|
changes *endpointsChangeMap,
|
||||||
hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
|
hostname string) (result updateEndpointMapResult) {
|
||||||
syncRequired = false
|
result.staleEndpoints = make(map[endpointServicePair]bool)
|
||||||
staleSet = make(map[endpointServicePair]bool)
|
result.staleServiceNames = make(map[proxy.ServicePortName]bool)
|
||||||
for _, change := range changes.items {
|
|
||||||
oldEndpointsMap := endpointsToEndpointsMap(change.previous, hostname)
|
func() {
|
||||||
newEndpointsMap := endpointsToEndpointsMap(change.current, hostname)
|
changes.lock.Lock()
|
||||||
if !reflect.DeepEqual(oldEndpointsMap, newEndpointsMap) {
|
defer changes.lock.Unlock()
|
||||||
endpointsMap.unmerge(oldEndpointsMap)
|
for _, change := range changes.items {
|
||||||
endpointsMap.merge(newEndpointsMap)
|
endpointsMap.unmerge(change.previous)
|
||||||
detectStaleConnections(oldEndpointsMap, newEndpointsMap, staleSet)
|
endpointsMap.merge(change.current)
|
||||||
syncRequired = true
|
detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames)
|
||||||
}
|
}
|
||||||
}
|
changes.items = make(map[types.NamespacedName]*endpointsChange)
|
||||||
changes.items = make(map[types.NamespacedName]*endpointsChange)
|
}()
|
||||||
|
|
||||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
|
||||||
return
|
return
|
||||||
@ -525,13 +553,13 @@ func updateEndpointsMap(
|
|||||||
|
|
||||||
// TODO: If this will appear to be computationally expensive, consider
|
// TODO: If this will appear to be computationally expensive, consider
|
||||||
// computing this incrementally similarly to endpointsMap.
|
// computing this incrementally similarly to endpointsMap.
|
||||||
hcEndpoints = make(map[types.NamespacedName]int)
|
result.hcEndpoints = make(map[types.NamespacedName]int)
|
||||||
localIPs := getLocalIPs(endpointsMap)
|
localIPs := getLocalIPs(endpointsMap)
|
||||||
for nsn, ips := range localIPs {
|
for nsn, ips := range localIPs {
|
||||||
hcEndpoints[nsn] = len(ips)
|
result.hcEndpoints[nsn] = len(ips)
|
||||||
}
|
}
|
||||||
|
|
||||||
return syncRequired, hcEndpoints, staleSet
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// Translates single Endpoints object to proxyEndpointsMap.
|
// Translates single Endpoints object to proxyEndpointsMap.
|
||||||
@ -582,23 +610,28 @@ func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEnd
|
|||||||
return endpointsMap
|
return endpointsMap
|
||||||
}
|
}
|
||||||
|
|
||||||
func newEndpointsChangeMap() endpointsChangeMap {
|
func newEndpointsChangeMap(hostname string) endpointsChangeMap {
|
||||||
return endpointsChangeMap{
|
return endpointsChangeMap{
|
||||||
items: make(map[types.NamespacedName]*endpointsChange),
|
hostname: hostname,
|
||||||
|
items: make(map[types.NamespacedName]*endpointsChange),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ecm *endpointsChangeMap) Update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) {
|
func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool {
|
||||||
ecm.Lock()
|
ecm.lock.Lock()
|
||||||
defer ecm.Unlock()
|
defer ecm.lock.Unlock()
|
||||||
|
|
||||||
change, exists := ecm.items[*namespacedName]
|
change, exists := ecm.items[*namespacedName]
|
||||||
if !exists {
|
if !exists {
|
||||||
change = &endpointsChange{}
|
change = &endpointsChange{}
|
||||||
change.previous = previous
|
change.previous = endpointsToEndpointsMap(previous, ecm.hostname)
|
||||||
ecm.items[*namespacedName] = change
|
ecm.items[*namespacedName] = change
|
||||||
}
|
}
|
||||||
change.current = current
|
change.current = endpointsToEndpointsMap(current, ecm.hostname)
|
||||||
|
if reflect.DeepEqual(change.previous, change.current) {
|
||||||
|
delete(ecm.items, *namespacedName)
|
||||||
|
}
|
||||||
|
return len(ecm.items) > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
|
func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
|
||||||
@ -726,81 +759,89 @@ func CleanupLeftovers(execer utilexec.Interface, ipvs utilipvs.Interface, ipt ut
|
|||||||
return encounteredError
|
return encounteredError
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync is called to immediately synchronize the proxier state to iptables
|
// Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible.
|
||||||
func (proxier *Proxier) Sync() {
|
func (proxier *Proxier) Sync() {
|
||||||
proxier.syncProxyRules(syncReasonForce)
|
proxier.syncRunner.Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
|
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
|
||||||
func (proxier *Proxier) SyncLoop() {
|
func (proxier *Proxier) SyncLoop() {
|
||||||
t := time.NewTicker(proxier.syncPeriod)
|
|
||||||
defer t.Stop()
|
|
||||||
// Update healthz timestamp at beginning in case Sync() never succeeds.
|
// Update healthz timestamp at beginning in case Sync() never succeeds.
|
||||||
if proxier.healthzServer != nil {
|
if proxier.healthzServer != nil {
|
||||||
proxier.healthzServer.UpdateTimestamp()
|
proxier.healthzServer.UpdateTimestamp()
|
||||||
}
|
}
|
||||||
for {
|
proxier.syncRunner.Loop(wait.NeverStop)
|
||||||
<-t.C
|
}
|
||||||
glog.V(6).Infof("Periodic sync")
|
|
||||||
proxier.Sync()
|
func (proxier *Proxier) setInitialized(value bool) {
|
||||||
|
var initialized int32
|
||||||
|
if value {
|
||||||
|
initialized = 1
|
||||||
}
|
}
|
||||||
|
atomic.StoreInt32(&proxier.initialized, initialized)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (proxier *Proxier) isInitialized() bool {
|
||||||
|
return atomic.LoadInt32(&proxier.initialized) > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnServiceAdd is called whenever creation of new service object is observed.
|
// OnServiceAdd is called whenever creation of new service object is observed.
|
||||||
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
|
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
|
||||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
proxier.serviceChanges.update(&namespacedName, nil, service)
|
if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
|
||||||
|
proxier.syncRunner.Run()
|
||||||
proxier.syncProxyRules(syncReasonServices)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnServiceUpdate is called whenever modification of an existing service object is observed.
|
// OnServiceUpdate is called whenever modification of an existing service object is observed.
|
||||||
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
|
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
|
||||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
proxier.serviceChanges.update(&namespacedName, oldService, service)
|
if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() {
|
||||||
|
proxier.syncRunner.Run()
|
||||||
proxier.syncProxyRules(syncReasonServices)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnServiceDelete is called whenever deletion of an existing service object is observed.
|
// OnServiceDelete is called whenever deletion of an existing service object is observed.
|
||||||
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
|
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
|
||||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
proxier.serviceChanges.update(&namespacedName, service, nil)
|
if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() {
|
||||||
|
proxier.syncRunner.Run()
|
||||||
proxier.syncProxyRules(syncReasonServices)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnServiceSynced is called once all the initial even handlers were called and the state is fully propagated to local cache.
|
// OnServiceSynced is called once all the initial even handlers were called and the state is fully propagated to local cache.
|
||||||
func (proxier *Proxier) OnServiceSynced() {
|
func (proxier *Proxier) OnServiceSynced() {
|
||||||
proxier.mu.Lock()
|
proxier.mu.Lock()
|
||||||
proxier.servicesSynced = true
|
proxier.servicesSynced = true
|
||||||
|
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
|
||||||
proxier.mu.Unlock()
|
proxier.mu.Unlock()
|
||||||
|
|
||||||
proxier.syncProxyRules(syncReasonServices)
|
// Sync unconditionally - this is called once per lifetime.
|
||||||
|
proxier.syncProxyRules()
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnEndpointsAdd is called whenever creation of new endpoints object is observed.
|
// OnEndpointsAdd is called whenever creation of new endpoints object is observed.
|
||||||
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
|
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
|
||||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||||
proxier.endpointsChanges.Update(&namespacedName, nil, endpoints)
|
if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() {
|
||||||
|
proxier.syncRunner.Run()
|
||||||
proxier.syncProxyRules(syncReasonEndpoints)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
|
// OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
|
||||||
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
|
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
|
||||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||||
proxier.endpointsChanges.Update(&namespacedName, oldEndpoints, endpoints)
|
if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() {
|
||||||
|
proxier.syncRunner.Run()
|
||||||
proxier.syncProxyRules(syncReasonEndpoints)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed.
|
// OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed.
|
||||||
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
|
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
|
||||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||||
proxier.endpointsChanges.Update(&namespacedName, endpoints, nil)
|
if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() {
|
||||||
|
proxier.syncRunner.Run()
|
||||||
proxier.syncProxyRules(syncReasonEndpoints)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
|
// OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
|
||||||
@ -809,7 +850,7 @@ func (proxier *Proxier) OnEndpointsSynced() {
|
|||||||
proxier.endpointsSynced = true
|
proxier.endpointsSynced = true
|
||||||
proxier.mu.Unlock()
|
proxier.mu.Unlock()
|
||||||
|
|
||||||
proxier.syncProxyRules(syncReasonEndpoints)
|
proxier.syncProxyRules()
|
||||||
}
|
}
|
||||||
|
|
||||||
type syncReason string
|
type syncReason string
|
||||||
@ -820,16 +861,13 @@ const syncReasonForce syncReason = "Force"
|
|||||||
|
|
||||||
// This is where all of the ipvs calls happen.
|
// This is where all of the ipvs calls happen.
|
||||||
// assumes proxier.mu is held
|
// assumes proxier.mu is held
|
||||||
func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
func (proxier *Proxier) syncProxyRules() {
|
||||||
proxier.mu.Lock()
|
proxier.mu.Lock()
|
||||||
defer proxier.mu.Unlock()
|
defer proxier.mu.Unlock()
|
||||||
|
|
||||||
if proxier.throttle != nil {
|
|
||||||
proxier.throttle.Accept()
|
|
||||||
}
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start))
|
glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
|
||||||
}()
|
}()
|
||||||
// don't sync rules till we've received services and endpoints
|
// don't sync rules till we've received services and endpoints
|
||||||
if !proxier.endpointsSynced || !proxier.servicesSynced {
|
if !proxier.endpointsSynced || !proxier.servicesSynced {
|
||||||
@ -837,27 +875,21 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Figure out the new services we need to activate.
|
// We assume that if this was called, we really want to sync them,
|
||||||
proxier.serviceChanges.Lock()
|
// even if nothing changed in the meantime. In other words, callers are
|
||||||
serviceSyncRequired, hcServices, staleServices := updateServiceMap(
|
// responsible for detecting no-op changes and not calling this function.
|
||||||
|
serviceUpdateResult := updateServiceMap(
|
||||||
proxier.serviceMap, &proxier.serviceChanges)
|
proxier.serviceMap, &proxier.serviceChanges)
|
||||||
proxier.serviceChanges.Unlock()
|
endpointUpdateResult := updateEndpointsMap(
|
||||||
|
|
||||||
// If this was called because of a services update, but nothing actionable has changed, skip it.
|
|
||||||
if reason == syncReasonServices && !serviceSyncRequired {
|
|
||||||
glog.V(3).Infof("Skipping ipvs sync because nothing changed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
proxier.endpointsChanges.Lock()
|
|
||||||
endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap(
|
|
||||||
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
|
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
|
||||||
proxier.endpointsChanges.Unlock()
|
|
||||||
|
|
||||||
// If this was called because of an endpoints update, but nothing actionable has changed, skip it.
|
staleServices := serviceUpdateResult.staleServices
|
||||||
if reason == syncReasonEndpoints && !endpointsSyncRequired {
|
// merge stale services gathered from updateEndpointsMap
|
||||||
glog.V(3).Infof("Skipping ipvs sync because nothing changed")
|
for svcPortName := range endpointUpdateResult.staleServiceNames {
|
||||||
return
|
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP {
|
||||||
|
glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String())
|
||||||
|
staleServices.Insert(svcInfo.clusterIP.String())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(3).Infof("Syncing ipvs Proxier rules")
|
glog.V(3).Infof("Syncing ipvs Proxier rules")
|
||||||
@ -1166,13 +1198,14 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
|||||||
|
|
||||||
// Sync iptables rules.
|
// Sync iptables rules.
|
||||||
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
|
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
|
||||||
natLines := append(proxier.natChains.Bytes(), proxier.natRules.Bytes()...)
|
proxier.iptablesData.Reset()
|
||||||
lines := natLines
|
proxier.iptablesData.Write(proxier.natChains.Bytes())
|
||||||
|
proxier.iptablesData.Write(proxier.natRules.Bytes())
|
||||||
|
|
||||||
glog.V(3).Infof("Restoring iptables rules: %s", lines)
|
glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
|
||||||
err = proxier.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
|
err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, lines)
|
glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())
|
||||||
// Revert new local ports.
|
// Revert new local ports.
|
||||||
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
|
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
|
||||||
return
|
return
|
||||||
@ -1197,18 +1230,18 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
|||||||
}
|
}
|
||||||
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
|
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
|
||||||
|
|
||||||
// Update healthz timestamp if it is periodic sync.
|
// Update healthz timestamp
|
||||||
if proxier.healthzServer != nil && reason == syncReasonForce {
|
if proxier.healthzServer != nil {
|
||||||
proxier.healthzServer.UpdateTimestamp()
|
proxier.healthzServer.UpdateTimestamp()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update healthchecks. The endpoints list might include services that are
|
// Update healthchecks. The endpoints list might include services that are
|
||||||
// not "OnlyLocal", but the services list will not, and the healthChecker
|
// not "OnlyLocal", but the services list will not, and the healthChecker
|
||||||
// will just drop those endpoints.
|
// will just drop those endpoints.
|
||||||
if err := proxier.healthChecker.SyncServices(hcServices); err != nil {
|
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil {
|
||||||
glog.Errorf("Error syncing healtcheck services: %v", err)
|
glog.Errorf("Error syncing healtcheck services: %v", err)
|
||||||
}
|
}
|
||||||
if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil {
|
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
|
||||||
glog.Errorf("Error syncing healthcheck endpoints: %v", err)
|
glog.Errorf("Error syncing healthcheck endpoints: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1219,7 +1252,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
|||||||
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
|
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
proxier.deleteEndpointConnections(staleEndpoints)
|
proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints)
|
||||||
}
|
}
|
||||||
|
|
||||||
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
|
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
|
||||||
@ -1408,7 +1441,15 @@ func (proxier *Proxier) linkKubeServiceChain(existingNATChains map[utiliptables.
|
|||||||
|
|
||||||
// Join all words with spaces, terminate with newline and write to buff.
|
// Join all words with spaces, terminate with newline and write to buff.
|
||||||
func writeLine(buf *bytes.Buffer, words ...string) {
|
func writeLine(buf *bytes.Buffer, words ...string) {
|
||||||
buf.WriteString(strings.Join(words, " ") + "\n")
|
// We avoid strings.Join for performance reasons.
|
||||||
|
for i := range words {
|
||||||
|
buf.WriteString(words[i])
|
||||||
|
if i < len(words)-1 {
|
||||||
|
buf.WriteByte(' ')
|
||||||
|
} else {
|
||||||
|
buf.WriteByte('\n')
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
|
func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
|
||||||
@ -1420,8 +1461,7 @@ func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.S
|
|||||||
if localIPs[nsn] == nil {
|
if localIPs[nsn] == nil {
|
||||||
localIPs[nsn] = sets.NewString()
|
localIPs[nsn] = sets.NewString()
|
||||||
}
|
}
|
||||||
ip := strings.Split(ep.endpoint, ":")[0] // just the IP part
|
localIPs[nsn].Insert(ep.IPPart()) // just the IP part
|
||||||
localIPs[nsn].Insert(ip)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -103,7 +103,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs
|
|||||||
serviceMap: make(proxyServiceMap),
|
serviceMap: make(proxyServiceMap),
|
||||||
serviceChanges: newServiceChangeMap(),
|
serviceChanges: newServiceChangeMap(),
|
||||||
endpointsMap: make(proxyEndpointsMap),
|
endpointsMap: make(proxyEndpointsMap),
|
||||||
endpointsChanges: newEndpointsChangeMap(),
|
endpointsChanges: newEndpointsChangeMap(testHostname),
|
||||||
iptables: ipt,
|
iptables: ipt,
|
||||||
ipvs: ipvs,
|
ipvs: ipvs,
|
||||||
clusterCIDR: "10.0.0.0/24",
|
clusterCIDR: "10.0.0.0/24",
|
||||||
@ -208,7 +208,7 @@ func TestNodePort(t *testing.T) {
|
|||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
fp.syncProxyRules(syncReasonForce)
|
fp.syncProxyRules()
|
||||||
|
|
||||||
// Check ipvs service and destinations
|
// Check ipvs service and destinations
|
||||||
services, err := ipvs.GetVirtualServers()
|
services, err := ipvs.GetVirtualServers()
|
||||||
@ -266,7 +266,7 @@ func TestNodePortNoEndpoint(t *testing.T) {
|
|||||||
)
|
)
|
||||||
makeEndpointsMap(fp)
|
makeEndpointsMap(fp)
|
||||||
|
|
||||||
fp.syncProxyRules(syncReasonForce)
|
fp.syncProxyRules()
|
||||||
|
|
||||||
// Check ipvs service and destinations
|
// Check ipvs service and destinations
|
||||||
services, err := ipvs.GetVirtualServers()
|
services, err := ipvs.GetVirtualServers()
|
||||||
@ -314,7 +314,7 @@ func TestClusterIPNoEndpoint(t *testing.T) {
|
|||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
makeEndpointsMap(fp)
|
makeEndpointsMap(fp)
|
||||||
fp.syncProxyRules(syncReasonForce)
|
fp.syncProxyRules()
|
||||||
|
|
||||||
// check ipvs service and destinations
|
// check ipvs service and destinations
|
||||||
services, err := ipvs.GetVirtualServers()
|
services, err := ipvs.GetVirtualServers()
|
||||||
@ -372,7 +372,7 @@ func TestClusterIP(t *testing.T) {
|
|||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
fp.syncProxyRules(syncReasonForce)
|
fp.syncProxyRules()
|
||||||
|
|
||||||
// check ipvs service and destinations
|
// check ipvs service and destinations
|
||||||
services, err := ipvs.GetVirtualServers()
|
services, err := ipvs.GetVirtualServers()
|
||||||
@ -423,7 +423,7 @@ func TestExternalIPsNoEndpoint(t *testing.T) {
|
|||||||
|
|
||||||
makeEndpointsMap(fp)
|
makeEndpointsMap(fp)
|
||||||
|
|
||||||
fp.syncProxyRules(syncReasonForce)
|
fp.syncProxyRules()
|
||||||
|
|
||||||
// check ipvs service and destinations
|
// check ipvs service and destinations
|
||||||
services, err := ipvs.GetVirtualServers()
|
services, err := ipvs.GetVirtualServers()
|
||||||
@ -490,7 +490,7 @@ func TestExternalIPs(t *testing.T) {
|
|||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
fp.syncProxyRules(syncReasonForce)
|
fp.syncProxyRules()
|
||||||
|
|
||||||
// check ipvs service and destinations
|
// check ipvs service and destinations
|
||||||
services, err := ipvs.GetVirtualServers()
|
services, err := ipvs.GetVirtualServers()
|
||||||
@ -562,7 +562,7 @@ func TestLoadBalancer(t *testing.T) {
|
|||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
fp.syncProxyRules(syncReasonForce)
|
fp.syncProxyRules()
|
||||||
}
|
}
|
||||||
|
|
||||||
func strPtr(s string) *string {
|
func strPtr(s string) *string {
|
||||||
@ -616,7 +616,7 @@ func TestOnlyLocalNodePorts(t *testing.T) {
|
|||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
fp.syncProxyRules(syncReasonForce)
|
fp.syncProxyRules()
|
||||||
|
|
||||||
// Expect 2 services and 1 destination
|
// Expect 2 services and 1 destination
|
||||||
services, err := ipvs.GetVirtualServers()
|
services, err := ipvs.GetVirtualServers()
|
||||||
@ -700,7 +700,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
|
|||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
fp.syncProxyRules(syncReasonForce)
|
fp.syncProxyRules()
|
||||||
}
|
}
|
||||||
|
|
||||||
func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, port, nodeport int32, targetPort int) []api.ServicePort {
|
func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, port, nodeport int32, targetPort int) []api.ServicePort {
|
||||||
@ -763,24 +763,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
|||||||
for i := range services {
|
for i := range services {
|
||||||
fp.OnServiceAdd(services[i])
|
fp.OnServiceAdd(services[i])
|
||||||
}
|
}
|
||||||
_, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
result := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||||
if len(fp.serviceMap) != 8 {
|
if len(fp.serviceMap) != 8 {
|
||||||
t.Errorf("expected service map length 8, got %v", fp.serviceMap)
|
t.Errorf("expected service map length 8, got %v", fp.serviceMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The only-local-loadbalancer ones get added
|
// The only-local-loadbalancer ones get added
|
||||||
if len(hcPorts) != 1 {
|
if len(result.hcServices) != 1 {
|
||||||
t.Errorf("expected 1 healthcheck port, got %v", hcPorts)
|
t.Errorf("expected 1 healthcheck port, got %v", result.hcServices)
|
||||||
} else {
|
} else {
|
||||||
nsn := makeNSN("somewhere", "only-local-load-balancer")
|
nsn := makeNSN("somewhere", "only-local-load-balancer")
|
||||||
if port, found := hcPorts[nsn]; !found || port != 345 {
|
if port, found := result.hcServices[nsn]; !found || port != 345 {
|
||||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, hcPorts)
|
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.hcServices)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(staleUDPServices) != 0 {
|
if len(result.staleServices) != 0 {
|
||||||
// Services only added, so nothing stale yet
|
// Services only added, so nothing stale yet
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove some stuff
|
// Remove some stuff
|
||||||
@ -796,24 +796,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
|||||||
fp.OnServiceDelete(services[2])
|
fp.OnServiceDelete(services[2])
|
||||||
fp.OnServiceDelete(services[3])
|
fp.OnServiceDelete(services[3])
|
||||||
|
|
||||||
_, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
result = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||||
if len(fp.serviceMap) != 1 {
|
if len(fp.serviceMap) != 1 {
|
||||||
t.Errorf("expected service map length 1, got %v", fp.serviceMap)
|
t.Errorf("expected service map length 1, got %v", fp.serviceMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(hcPorts) != 0 {
|
if len(result.hcServices) != 0 {
|
||||||
t.Errorf("expected 0 healthcheck ports, got %v", hcPorts)
|
t.Errorf("expected 0 healthcheck ports, got %v", result.hcServices)
|
||||||
}
|
}
|
||||||
|
|
||||||
// All services but one were deleted. While you'd expect only the ClusterIPs
|
// All services but one were deleted. While you'd expect only the ClusterIPs
|
||||||
// from the three deleted services here, we still have the ClusterIP for
|
// from the three deleted services here, we still have the ClusterIP for
|
||||||
// the not-deleted service, because one of it's ServicePorts was deleted.
|
// the not-deleted service, because one of it's ServicePorts was deleted.
|
||||||
expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
|
expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
|
||||||
if len(staleUDPServices) != len(expectedStaleUDPServices) {
|
if len(result.staleServices) != len(expectedStaleUDPServices) {
|
||||||
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), staleUDPServices.List())
|
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.staleServices.List())
|
||||||
}
|
}
|
||||||
for _, ip := range expectedStaleUDPServices {
|
for _, ip := range expectedStaleUDPServices {
|
||||||
if !staleUDPServices.Has(ip) {
|
if !result.staleServices.Has(ip) {
|
||||||
t.Errorf("expected stale UDP service service %s", ip)
|
t.Errorf("expected stale UDP service service %s", ip)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -830,21 +830,25 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
|||||||
svc.Spec.ClusterIP = api.ClusterIPNone
|
svc.Spec.ClusterIP = api.ClusterIPNone
|
||||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
|
||||||
}),
|
}),
|
||||||
|
makeTestService("somewhere-else", "headless-without-port", func(svc *api.Service) {
|
||||||
|
svc.Spec.Type = api.ServiceTypeClusterIP
|
||||||
|
svc.Spec.ClusterIP = api.ClusterIPNone
|
||||||
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
// Headless service should be ignored
|
// Headless service should be ignored
|
||||||
_, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
result := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||||
if len(fp.serviceMap) != 0 {
|
if len(fp.serviceMap) != 0 {
|
||||||
t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
|
t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
|
||||||
}
|
}
|
||||||
|
|
||||||
// No proxied services, so no healthchecks
|
// No proxied services, so no healthchecks
|
||||||
if len(hcPorts) != 0 {
|
if len(result.hcServices) != 0 {
|
||||||
t.Errorf("expected healthcheck ports length 0, got %d", len(hcPorts))
|
t.Errorf("expected healthcheck ports length 0, got %d", len(result.hcServices))
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(staleUDPServices) != 0 {
|
if len(result.staleServices) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -862,16 +866,16 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
|||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
_, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
result := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||||
if len(fp.serviceMap) != 0 {
|
if len(fp.serviceMap) != 0 {
|
||||||
t.Errorf("expected service map length 0, got %v", fp.serviceMap)
|
t.Errorf("expected service map length 0, got %v", fp.serviceMap)
|
||||||
}
|
}
|
||||||
// No proxied services, so no healthchecks
|
// No proxied services, so no healthchecks
|
||||||
if len(hcPorts) != 0 {
|
if len(result.hcServices) != 0 {
|
||||||
t.Errorf("expected healthcheck ports length 0, got %v", hcPorts)
|
t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices)
|
||||||
}
|
}
|
||||||
if len(staleUDPServices) != 0 {
|
if len(result.staleServices) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices)
|
t.Errorf("expected stale UDP services length 0, got %v", result.staleServices)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -903,69 +907,57 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
|
|
||||||
fp.OnServiceAdd(servicev1)
|
fp.OnServiceAdd(servicev1)
|
||||||
|
|
||||||
syncRequired, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
result := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||||
if !syncRequired {
|
|
||||||
t.Errorf("expected sync required, got %t", syncRequired)
|
|
||||||
}
|
|
||||||
if len(fp.serviceMap) != 2 {
|
if len(fp.serviceMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
||||||
}
|
}
|
||||||
if len(hcPorts) != 0 {
|
if len(result.hcServices) != 0 {
|
||||||
t.Errorf("expected healthcheck ports length 0, got %v", hcPorts)
|
t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices)
|
||||||
}
|
}
|
||||||
if len(staleUDPServices) != 0 {
|
if len(result.staleServices) != 0 {
|
||||||
// Services only added, so nothing stale yet
|
// Services only added, so nothing stale yet
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Change service to load-balancer
|
// Change service to load-balancer
|
||||||
fp.OnServiceUpdate(servicev1, servicev2)
|
fp.OnServiceUpdate(servicev1, servicev2)
|
||||||
syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
result = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||||
if !syncRequired {
|
|
||||||
t.Errorf("expected sync required, got %t", syncRequired)
|
|
||||||
}
|
|
||||||
if len(fp.serviceMap) != 2 {
|
if len(fp.serviceMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
||||||
}
|
}
|
||||||
if len(hcPorts) != 1 {
|
if len(result.hcServices) != 1 {
|
||||||
t.Errorf("expected healthcheck ports length 1, got %v", hcPorts)
|
t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices)
|
||||||
}
|
}
|
||||||
if len(staleUDPServices) != 0 {
|
if len(result.staleServices) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List())
|
t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.List())
|
||||||
}
|
}
|
||||||
|
|
||||||
// No change; make sure the service map stays the same and there are
|
// No change; make sure the service map stays the same and there are
|
||||||
// no health-check changes
|
// no health-check changes
|
||||||
fp.OnServiceUpdate(servicev2, servicev2)
|
fp.OnServiceUpdate(servicev2, servicev2)
|
||||||
syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
result = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||||
if syncRequired {
|
|
||||||
t.Errorf("not expected sync required, got %t", syncRequired)
|
|
||||||
}
|
|
||||||
if len(fp.serviceMap) != 2 {
|
if len(fp.serviceMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
||||||
}
|
}
|
||||||
if len(hcPorts) != 1 {
|
if len(result.hcServices) != 1 {
|
||||||
t.Errorf("expected healthcheck ports length 1, got %v", hcPorts)
|
t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices)
|
||||||
}
|
}
|
||||||
if len(staleUDPServices) != 0 {
|
if len(result.staleServices) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List())
|
t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.List())
|
||||||
}
|
}
|
||||||
|
|
||||||
// And back to ClusterIP
|
// And back to ClusterIP
|
||||||
fp.OnServiceUpdate(servicev2, servicev1)
|
fp.OnServiceUpdate(servicev2, servicev1)
|
||||||
syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
result = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||||
if !syncRequired {
|
|
||||||
t.Errorf("expected sync required, got %t", syncRequired)
|
|
||||||
}
|
|
||||||
if len(fp.serviceMap) != 2 {
|
if len(fp.serviceMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
||||||
}
|
}
|
||||||
if len(hcPorts) != 0 {
|
if len(result.hcServices) != 0 {
|
||||||
t.Errorf("expected healthcheck ports length 0, got %v", hcPorts)
|
t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices)
|
||||||
}
|
}
|
||||||
if len(staleUDPServices) != 0 {
|
if len(result.staleServices) != 0 {
|
||||||
// Services only added, so nothing stale yet
|
// Services only added, so nothing stale yet
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1005,7 +997,7 @@ func TestSessionAffinity(t *testing.T) {
|
|||||||
)
|
)
|
||||||
makeEndpointsMap(fp)
|
makeEndpointsMap(fp)
|
||||||
|
|
||||||
fp.syncProxyRules(syncReasonForce)
|
fp.syncProxyRules()
|
||||||
|
|
||||||
// check ipvs service and destinations
|
// check ipvs service and destinations
|
||||||
services, err := ipvs.GetVirtualServers()
|
services, err := ipvs.GetVirtualServers()
|
||||||
@ -1027,8 +1019,11 @@ func makeServicePortName(ns, name, port string) proxy.ServicePortName {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Test_updateEndpointsMap(t *testing.T) {
|
func Test_updateEndpointsMap(t *testing.T) {
|
||||||
var nodeName = "host"
|
var nodeName = testHostname
|
||||||
|
|
||||||
|
emptyEndpoint := func(ept *api.Endpoints) {
|
||||||
|
ept.Subsets = []api.EndpointSubset{}
|
||||||
|
}
|
||||||
unnamedPort := func(ept *api.Endpoints) {
|
unnamedPort := func(ept *api.Endpoints) {
|
||||||
ept.Subsets = []api.EndpointSubset{{
|
ept.Subsets = []api.EndpointSubset{{
|
||||||
Addresses: []api.EndpointAddress{{
|
Addresses: []api.EndpointAddress{{
|
||||||
@ -1333,18 +1328,20 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
// previousEndpoints and currentEndpoints are used to call appropriate
|
// previousEndpoints and currentEndpoints are used to call appropriate
|
||||||
// handlers OnEndpoints* (based on whether corresponding values are nil
|
// handlers OnEndpoints* (based on whether corresponding values are nil
|
||||||
// or non-nil) and must be of equal length.
|
// or non-nil) and must be of equal length.
|
||||||
previousEndpoints []*api.Endpoints
|
previousEndpoints []*api.Endpoints
|
||||||
currentEndpoints []*api.Endpoints
|
currentEndpoints []*api.Endpoints
|
||||||
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
|
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
|
||||||
expectedResult map[proxy.ServicePortName][]*endpointsInfo
|
expectedResult map[proxy.ServicePortName][]*endpointsInfo
|
||||||
expectedStale []endpointServicePair
|
expectedStaleEndpoints []endpointServicePair
|
||||||
expectedHealthchecks map[types.NamespacedName]int
|
expectedStaleServiceNames map[proxy.ServicePortName]bool
|
||||||
|
expectedHealthchecks map[types.NamespacedName]int
|
||||||
}{{
|
}{{
|
||||||
// Case[0]: nothing
|
// Case[0]: nothing
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||||
expectedStale: []endpointServicePair{},
|
expectedStaleEndpoints: []endpointServicePair{},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||||
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[1]: no change, unnamed port
|
// Case[1]: no change, unnamed port
|
||||||
previousEndpoints: []*api.Endpoints{
|
previousEndpoints: []*api.Endpoints{
|
||||||
@ -1355,16 +1352,17 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", ""): {
|
makeServicePortName("ns1", "ep1", ""): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", ""): {
|
makeServicePortName("ns1", "ep1", ""): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{},
|
expectedStaleEndpoints: []endpointServicePair{},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||||
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[2]: no change, named port, local
|
// Case[2]: no change, named port, local
|
||||||
previousEndpoints: []*api.Endpoints{
|
previousEndpoints: []*api.Endpoints{
|
||||||
@ -1375,15 +1373,16 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", true},
|
{endpoint: "1.1.1.1:11", isLocal: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", true},
|
{endpoint: "1.1.1.1:11", isLocal: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{},
|
expectedStaleEndpoints: []endpointServicePair{},
|
||||||
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{
|
expectedHealthchecks: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 1,
|
makeNSN("ns1", "ep1"): 1,
|
||||||
},
|
},
|
||||||
@ -1397,22 +1396,23 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p12"): {
|
makeServicePortName("ns1", "ep1", "p12"): {
|
||||||
{"1.1.1.2:12", false},
|
{endpoint: "1.1.1.2:12", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p12"): {
|
makeServicePortName("ns1", "ep1", "p12"): {
|
||||||
{"1.1.1.2:12", false},
|
{endpoint: "1.1.1.2:12", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{},
|
expectedStaleEndpoints: []endpointServicePair{},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||||
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[4]: no change, multiple subsets, multiple ports, local
|
// Case[4]: no change, multiple subsets, multiple ports, local
|
||||||
previousEndpoints: []*api.Endpoints{
|
previousEndpoints: []*api.Endpoints{
|
||||||
@ -1423,32 +1423,33 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", true},
|
{endpoint: "1.1.1.1:11", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p12"): {
|
makeServicePortName("ns1", "ep1", "p12"): {
|
||||||
{"1.1.1.1:12", true},
|
{endpoint: "1.1.1.1:12", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p13"): {
|
makeServicePortName("ns1", "ep1", "p13"): {
|
||||||
{"1.1.1.3:13", false},
|
{endpoint: "1.1.1.3:13", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", true},
|
{endpoint: "1.1.1.1:11", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p12"): {
|
makeServicePortName("ns1", "ep1", "p12"): {
|
||||||
{"1.1.1.1:12", true},
|
{endpoint: "1.1.1.1:12", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p13"): {
|
makeServicePortName("ns1", "ep1", "p13"): {
|
||||||
{"1.1.1.3:13", false},
|
{endpoint: "1.1.1.3:13", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{},
|
expectedStaleEndpoints: []endpointServicePair{},
|
||||||
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{
|
expectedHealthchecks: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 1,
|
makeNSN("ns1", "ep1"): 1,
|
||||||
},
|
},
|
||||||
}, {
|
}, {
|
||||||
// Case[5]: no change, multiple Endpoints, subsets, IPs, and ports
|
// Case[5]: no change, multiple endpoints, subsets, IPs, and ports
|
||||||
previousEndpoints: []*api.Endpoints{
|
previousEndpoints: []*api.Endpoints{
|
||||||
makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
|
makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
|
||||||
makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
|
makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
|
||||||
@ -1459,57 +1460,58 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
{"1.1.1.2:11", true},
|
{endpoint: "1.1.1.2:11", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p12"): {
|
makeServicePortName("ns1", "ep1", "p12"): {
|
||||||
{"1.1.1.1:12", false},
|
{endpoint: "1.1.1.1:12", isLocal: false},
|
||||||
{"1.1.1.2:12", true},
|
{endpoint: "1.1.1.2:12", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p13"): {
|
makeServicePortName("ns1", "ep1", "p13"): {
|
||||||
{"1.1.1.3:13", false},
|
{endpoint: "1.1.1.3:13", isLocal: false},
|
||||||
{"1.1.1.4:13", true},
|
{endpoint: "1.1.1.4:13", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p14"): {
|
makeServicePortName("ns1", "ep1", "p14"): {
|
||||||
{"1.1.1.3:14", false},
|
{endpoint: "1.1.1.3:14", isLocal: false},
|
||||||
{"1.1.1.4:14", true},
|
{endpoint: "1.1.1.4:14", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns2", "ep2", "p21"): {
|
makeServicePortName("ns2", "ep2", "p21"): {
|
||||||
{"2.2.2.1:21", false},
|
{endpoint: "2.2.2.1:21", isLocal: false},
|
||||||
{"2.2.2.2:21", true},
|
{endpoint: "2.2.2.2:21", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns2", "ep2", "p22"): {
|
makeServicePortName("ns2", "ep2", "p22"): {
|
||||||
{"2.2.2.1:22", false},
|
{endpoint: "2.2.2.1:22", isLocal: false},
|
||||||
{"2.2.2.2:22", true},
|
{endpoint: "2.2.2.2:22", isLocal: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
{"1.1.1.2:11", true},
|
{endpoint: "1.1.1.2:11", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p12"): {
|
makeServicePortName("ns1", "ep1", "p12"): {
|
||||||
{"1.1.1.1:12", false},
|
{endpoint: "1.1.1.1:12", isLocal: false},
|
||||||
{"1.1.1.2:12", true},
|
{endpoint: "1.1.1.2:12", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p13"): {
|
makeServicePortName("ns1", "ep1", "p13"): {
|
||||||
{"1.1.1.3:13", false},
|
{endpoint: "1.1.1.3:13", isLocal: false},
|
||||||
{"1.1.1.4:13", true},
|
{endpoint: "1.1.1.4:13", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p14"): {
|
makeServicePortName("ns1", "ep1", "p14"): {
|
||||||
{"1.1.1.3:14", false},
|
{endpoint: "1.1.1.3:14", isLocal: false},
|
||||||
{"1.1.1.4:14", true},
|
{endpoint: "1.1.1.4:14", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns2", "ep2", "p21"): {
|
makeServicePortName("ns2", "ep2", "p21"): {
|
||||||
{"2.2.2.1:21", false},
|
{endpoint: "2.2.2.1:21", isLocal: false},
|
||||||
{"2.2.2.2:21", true},
|
{endpoint: "2.2.2.2:21", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns2", "ep2", "p22"): {
|
makeServicePortName("ns2", "ep2", "p22"): {
|
||||||
{"2.2.2.1:22", false},
|
{endpoint: "2.2.2.1:22", isLocal: false},
|
||||||
{"2.2.2.2:22", true},
|
{endpoint: "2.2.2.2:22", isLocal: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{},
|
expectedStaleEndpoints: []endpointServicePair{},
|
||||||
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{
|
expectedHealthchecks: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 2,
|
makeNSN("ns1", "ep1"): 2,
|
||||||
makeNSN("ns2", "ep2"): 1,
|
makeNSN("ns2", "ep2"): 1,
|
||||||
@ -1525,10 +1527,13 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", ""): {
|
makeServicePortName("ns1", "ep1", ""): {
|
||||||
{"1.1.1.1:11", true},
|
{endpoint: "1.1.1.1:11", isLocal: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{},
|
expectedStaleEndpoints: []endpointServicePair{},
|
||||||
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||||
|
makeServicePortName("ns1", "ep1", ""): true,
|
||||||
|
},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{
|
expectedHealthchecks: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 1,
|
makeNSN("ns1", "ep1"): 1,
|
||||||
},
|
},
|
||||||
@ -1542,15 +1547,16 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", ""): {
|
makeServicePortName("ns1", "ep1", ""): {
|
||||||
{"1.1.1.1:11", true},
|
{endpoint: "1.1.1.1:11", isLocal: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||||
expectedStale: []endpointServicePair{{
|
expectedStaleEndpoints: []endpointServicePair{{
|
||||||
endpoint: "1.1.1.1:11",
|
endpoint: "1.1.1.1:11",
|
||||||
servicePortName: makeServicePortName("ns1", "ep1", ""),
|
servicePortName: makeServicePortName("ns1", "ep1", ""),
|
||||||
}},
|
}},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||||
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[8]: add an IP and port
|
// Case[8]: add an IP and port
|
||||||
previousEndpoints: []*api.Endpoints{
|
previousEndpoints: []*api.Endpoints{
|
||||||
@ -1561,20 +1567,23 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
{"1.1.1.2:11", true},
|
{endpoint: "1.1.1.2:11", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p12"): {
|
makeServicePortName("ns1", "ep1", "p12"): {
|
||||||
{"1.1.1.1:12", false},
|
{endpoint: "1.1.1.1:12", isLocal: false},
|
||||||
{"1.1.1.2:12", true},
|
{endpoint: "1.1.1.2:12", isLocal: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{},
|
expectedStaleEndpoints: []endpointServicePair{},
|
||||||
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||||
|
makeServicePortName("ns1", "ep1", "p12"): true,
|
||||||
|
},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{
|
expectedHealthchecks: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 1,
|
makeNSN("ns1", "ep1"): 1,
|
||||||
},
|
},
|
||||||
@ -1588,20 +1597,20 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
{"1.1.1.2:11", true},
|
{endpoint: "1.1.1.2:11", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p12"): {
|
makeServicePortName("ns1", "ep1", "p12"): {
|
||||||
{"1.1.1.1:12", false},
|
{endpoint: "1.1.1.1:12", isLocal: false},
|
||||||
{"1.1.1.2:12", true},
|
{endpoint: "1.1.1.2:12", isLocal: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{{
|
expectedStaleEndpoints: []endpointServicePair{{
|
||||||
endpoint: "1.1.1.2:11",
|
endpoint: "1.1.1.2:11",
|
||||||
servicePortName: makeServicePortName("ns1", "ep1", "p11"),
|
servicePortName: makeServicePortName("ns1", "ep1", "p11"),
|
||||||
}, {
|
}, {
|
||||||
@ -1611,7 +1620,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
endpoint: "1.1.1.2:12",
|
endpoint: "1.1.1.2:12",
|
||||||
servicePortName: makeServicePortName("ns1", "ep1", "p12"),
|
servicePortName: makeServicePortName("ns1", "ep1", "p12"),
|
||||||
}},
|
}},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||||
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[10]: add a subset
|
// Case[10]: add a subset
|
||||||
previousEndpoints: []*api.Endpoints{
|
previousEndpoints: []*api.Endpoints{
|
||||||
@ -1622,18 +1632,21 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p12"): {
|
makeServicePortName("ns1", "ep1", "p12"): {
|
||||||
{"1.1.1.2:12", true},
|
{endpoint: "1.1.1.2:12", isLocal: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{},
|
expectedStaleEndpoints: []endpointServicePair{},
|
||||||
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||||
|
makeServicePortName("ns1", "ep1", "p12"): true,
|
||||||
|
},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{
|
expectedHealthchecks: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 1,
|
makeNSN("ns1", "ep1"): 1,
|
||||||
},
|
},
|
||||||
@ -1647,22 +1660,23 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p12"): {
|
makeServicePortName("ns1", "ep1", "p12"): {
|
||||||
{"1.1.1.2:12", false},
|
{endpoint: "1.1.1.2:12", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{{
|
expectedStaleEndpoints: []endpointServicePair{{
|
||||||
endpoint: "1.1.1.2:12",
|
endpoint: "1.1.1.2:12",
|
||||||
servicePortName: makeServicePortName("ns1", "ep1", "p12"),
|
servicePortName: makeServicePortName("ns1", "ep1", "p12"),
|
||||||
}},
|
}},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||||
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[12]: rename a port
|
// Case[12]: rename a port
|
||||||
previousEndpoints: []*api.Endpoints{
|
previousEndpoints: []*api.Endpoints{
|
||||||
@ -1673,18 +1687,21 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11-2"): {
|
makeServicePortName("ns1", "ep1", "p11-2"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{{
|
expectedStaleEndpoints: []endpointServicePair{{
|
||||||
endpoint: "1.1.1.1:11",
|
endpoint: "1.1.1.1:11",
|
||||||
servicePortName: makeServicePortName("ns1", "ep1", "p11"),
|
servicePortName: makeServicePortName("ns1", "ep1", "p11"),
|
||||||
}},
|
}},
|
||||||
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||||
|
makeServicePortName("ns1", "ep1", "p11-2"): true,
|
||||||
|
},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[13]: renumber a port
|
// Case[13]: renumber a port
|
||||||
@ -1696,19 +1713,20 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:22", false},
|
{endpoint: "1.1.1.1:22", isLocal: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{{
|
expectedStaleEndpoints: []endpointServicePair{{
|
||||||
endpoint: "1.1.1.1:11",
|
endpoint: "1.1.1.1:11",
|
||||||
servicePortName: makeServicePortName("ns1", "ep1", "p11"),
|
servicePortName: makeServicePortName("ns1", "ep1", "p11"),
|
||||||
}},
|
}},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||||
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[14]: complex add and remove
|
// Case[14]: complex add and remove
|
||||||
previousEndpoints: []*api.Endpoints{
|
previousEndpoints: []*api.Endpoints{
|
||||||
@ -1725,42 +1743,42 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns2", "ep2", "p22"): {
|
makeServicePortName("ns2", "ep2", "p22"): {
|
||||||
{"2.2.2.2:22", true},
|
{endpoint: "2.2.2.2:22", isLocal: true},
|
||||||
{"2.2.2.22:22", true},
|
{endpoint: "2.2.2.22:22", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns2", "ep2", "p23"): {
|
makeServicePortName("ns2", "ep2", "p23"): {
|
||||||
{"2.2.2.3:23", true},
|
{endpoint: "2.2.2.3:23", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns4", "ep4", "p44"): {
|
makeServicePortName("ns4", "ep4", "p44"): {
|
||||||
{"4.4.4.4:44", true},
|
{endpoint: "4.4.4.4:44", isLocal: true},
|
||||||
{"4.4.4.5:44", true},
|
{endpoint: "4.4.4.5:44", isLocal: true},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns4", "ep4", "p45"): {
|
makeServicePortName("ns4", "ep4", "p45"): {
|
||||||
{"4.4.4.6:45", true},
|
{endpoint: "4.4.4.6:45", isLocal: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
makeServicePortName("ns1", "ep1", "p11"): {
|
makeServicePortName("ns1", "ep1", "p11"): {
|
||||||
{"1.1.1.1:11", false},
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
{"1.1.1.11:11", false},
|
{endpoint: "1.1.1.11:11", isLocal: false},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p12"): {
|
makeServicePortName("ns1", "ep1", "p12"): {
|
||||||
{"1.1.1.2:12", false},
|
{endpoint: "1.1.1.2:12", isLocal: false},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns1", "ep1", "p122"): {
|
makeServicePortName("ns1", "ep1", "p122"): {
|
||||||
{"1.1.1.2:122", false},
|
{endpoint: "1.1.1.2:122", isLocal: false},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns3", "ep3", "p33"): {
|
makeServicePortName("ns3", "ep3", "p33"): {
|
||||||
{"3.3.3.3:33", false},
|
{endpoint: "3.3.3.3:33", isLocal: false},
|
||||||
},
|
},
|
||||||
makeServicePortName("ns4", "ep4", "p44"): {
|
makeServicePortName("ns4", "ep4", "p44"): {
|
||||||
{"4.4.4.4:44", true},
|
{endpoint: "4.4.4.4:44", isLocal: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStale: []endpointServicePair{{
|
expectedStaleEndpoints: []endpointServicePair{{
|
||||||
endpoint: "2.2.2.2:22",
|
endpoint: "2.2.2.2:22",
|
||||||
servicePortName: makeServicePortName("ns2", "ep2", "p22"),
|
servicePortName: makeServicePortName("ns2", "ep2", "p22"),
|
||||||
}, {
|
}, {
|
||||||
@ -1776,10 +1794,35 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
endpoint: "4.4.4.6:45",
|
endpoint: "4.4.4.6:45",
|
||||||
servicePortName: makeServicePortName("ns4", "ep4", "p45"),
|
servicePortName: makeServicePortName("ns4", "ep4", "p45"),
|
||||||
}},
|
}},
|
||||||
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||||
|
makeServicePortName("ns1", "ep1", "p12"): true,
|
||||||
|
makeServicePortName("ns1", "ep1", "p122"): true,
|
||||||
|
makeServicePortName("ns3", "ep3", "p33"): true,
|
||||||
|
},
|
||||||
expectedHealthchecks: map[types.NamespacedName]int{
|
expectedHealthchecks: map[types.NamespacedName]int{
|
||||||
makeNSN("ns4", "ep4"): 1,
|
makeNSN("ns4", "ep4"): 1,
|
||||||
},
|
},
|
||||||
}}
|
}, {
|
||||||
|
// Case[15]: change from 0 endpoint address to 1 unnamed port
|
||||||
|
previousEndpoints: []*api.Endpoints{
|
||||||
|
makeTestEndpoints("ns1", "ep1", emptyEndpoint),
|
||||||
|
},
|
||||||
|
currentEndpoints: []*api.Endpoints{
|
||||||
|
makeTestEndpoints("ns1", "ep1", unnamedPort),
|
||||||
|
},
|
||||||
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||||
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||||
|
makeServicePortName("ns1", "ep1", ""): {
|
||||||
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedStaleEndpoints: []endpointServicePair{},
|
||||||
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||||
|
makeServicePortName("ns1", "ep1", ""): true,
|
||||||
|
},
|
||||||
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
for tci, tc := range testCases {
|
for tci, tc := range testCases {
|
||||||
ipt := iptablestest.NewFake()
|
ipt := iptablestest.NewFake()
|
||||||
@ -1787,7 +1830,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
fp := NewFakeProxier(ipt, ipvs, nil)
|
fp := NewFakeProxier(ipt, ipvs, nil)
|
||||||
fp.hostname = nodeName
|
fp.hostname = nodeName
|
||||||
|
|
||||||
// First check that after adding all previous versions of Endpoints,
|
// First check that after adding all previous versions of endpoints,
|
||||||
// the fp.oldEndpoints is as we expect.
|
// the fp.oldEndpoints is as we expect.
|
||||||
for i := range tc.previousEndpoints {
|
for i := range tc.previousEndpoints {
|
||||||
if tc.previousEndpoints[i] != nil {
|
if tc.previousEndpoints[i] != nil {
|
||||||
@ -1799,7 +1842,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
|
|
||||||
// Now let's call appropriate handlers to get to state we want to be.
|
// Now let's call appropriate handlers to get to state we want to be.
|
||||||
if len(tc.previousEndpoints) != len(tc.currentEndpoints) {
|
if len(tc.previousEndpoints) != len(tc.currentEndpoints) {
|
||||||
t.Fatalf("[%d] different lengths of previous and current Endpoints", tci)
|
t.Fatalf("[%d] different lengths of previous and current endpoints", tci)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1814,19 +1857,27 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
fp.OnEndpointsUpdate(prev, curr)
|
fp.OnEndpointsUpdate(prev, curr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_, hcEndpoints, stale := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname)
|
result := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname)
|
||||||
newMap := fp.endpointsMap
|
newMap := fp.endpointsMap
|
||||||
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
||||||
if len(stale) != len(tc.expectedStale) {
|
if len(result.staleEndpoints) != len(tc.expectedStaleEndpoints) {
|
||||||
t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(stale), stale)
|
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.staleEndpoints), result.staleEndpoints)
|
||||||
}
|
}
|
||||||
for _, x := range tc.expectedStale {
|
for _, x := range tc.expectedStaleEndpoints {
|
||||||
if stale[x] != true {
|
if result.staleEndpoints[x] != true {
|
||||||
t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, stale)
|
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.staleEndpoints)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(hcEndpoints, tc.expectedHealthchecks) {
|
if len(result.staleServiceNames) != len(tc.expectedStaleServiceNames) {
|
||||||
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, hcEndpoints)
|
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.staleServiceNames), result.staleServiceNames)
|
||||||
|
}
|
||||||
|
for svcName := range tc.expectedStaleServiceNames {
|
||||||
|
if result.staleServiceNames[svcName] != true {
|
||||||
|
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.staleServiceNames)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(result.hcEndpoints, tc.expectedHealthchecks) {
|
||||||
|
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.hcEndpoints)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user