mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
Merge pull request #45816 from wojtek-t/no_early_return_sync_proxy_rules
Automatic merge from submit-queue Change kube-proxy so that we won't perform no-op calls
This commit is contained in:
commit
8b706690fb
@ -194,18 +194,19 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
|
|||||||
}
|
}
|
||||||
|
|
||||||
type endpointsChange struct {
|
type endpointsChange struct {
|
||||||
previous *api.Endpoints
|
previous proxyEndpointsMap
|
||||||
current *api.Endpoints
|
current proxyEndpointsMap
|
||||||
}
|
}
|
||||||
|
|
||||||
type endpointsChangeMap struct {
|
type endpointsChangeMap struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
hostname string
|
||||||
items map[types.NamespacedName]*endpointsChange
|
items map[types.NamespacedName]*endpointsChange
|
||||||
}
|
}
|
||||||
|
|
||||||
type serviceChange struct {
|
type serviceChange struct {
|
||||||
previous *api.Service
|
previous proxyServiceMap
|
||||||
current *api.Service
|
current proxyServiceMap
|
||||||
}
|
}
|
||||||
|
|
||||||
type serviceChangeMap struct {
|
type serviceChangeMap struct {
|
||||||
@ -216,23 +217,34 @@ type serviceChangeMap struct {
|
|||||||
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
|
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
|
||||||
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
|
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
|
||||||
|
|
||||||
func newEndpointsChangeMap() endpointsChangeMap {
|
func newEndpointsChangeMap(hostname string) endpointsChangeMap {
|
||||||
return endpointsChangeMap{
|
return endpointsChangeMap{
|
||||||
|
hostname: hostname,
|
||||||
items: make(map[types.NamespacedName]*endpointsChange),
|
items: make(map[types.NamespacedName]*endpointsChange),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) {
|
func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool {
|
||||||
ecm.Lock()
|
ecm.Lock()
|
||||||
defer ecm.Unlock()
|
defer ecm.Unlock()
|
||||||
|
|
||||||
change, exists := ecm.items[*namespacedName]
|
change, exists := ecm.items[*namespacedName]
|
||||||
if !exists {
|
if !exists {
|
||||||
change = &endpointsChange{}
|
change = &endpointsChange{}
|
||||||
change.previous = previous
|
change.previous = endpointsToEndpointsMap(previous, ecm.hostname)
|
||||||
ecm.items[*namespacedName] = change
|
ecm.items[*namespacedName] = change
|
||||||
}
|
}
|
||||||
change.current = current
|
change.current = endpointsToEndpointsMap(current, ecm.hostname)
|
||||||
|
if reflect.DeepEqual(change.previous, change.current) {
|
||||||
|
delete(ecm.items, *namespacedName)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// TODO: Instead of returning true/false, we should consider returning whether
|
||||||
|
// the map contains some element or not. Currently, if the change is
|
||||||
|
// "reverting" some previous endpoints update, but there are still some other
|
||||||
|
// modified endpoints, we will return false, even though there are some change
|
||||||
|
// to apply.
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func newServiceChangeMap() serviceChangeMap {
|
func newServiceChangeMap() serviceChangeMap {
|
||||||
@ -241,17 +253,60 @@ func newServiceChangeMap() serviceChangeMap {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) {
|
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool {
|
||||||
scm.Lock()
|
scm.Lock()
|
||||||
defer scm.Unlock()
|
defer scm.Unlock()
|
||||||
|
|
||||||
change, exists := scm.items[*namespacedName]
|
change, exists := scm.items[*namespacedName]
|
||||||
if !exists {
|
if !exists {
|
||||||
change = &serviceChange{}
|
change = &serviceChange{}
|
||||||
change.previous = previous
|
change.previous = serviceToServiceMap(previous)
|
||||||
scm.items[*namespacedName] = change
|
scm.items[*namespacedName] = change
|
||||||
}
|
}
|
||||||
change.current = current
|
change.current = serviceToServiceMap(current)
|
||||||
|
if reflect.DeepEqual(change.previous, change.current) {
|
||||||
|
delete(scm.items, *namespacedName)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// TODO: Instead of returning true/false, we should consider returning whether
|
||||||
|
// the map contains some element or not. Currently, if the change is
|
||||||
|
// "reverting" some previous endpoints update, but there are still some other
|
||||||
|
// modified endpoints, we will return false, even though there are some change
|
||||||
|
// to apply.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
|
||||||
|
existingPorts := sets.NewString()
|
||||||
|
for serviceName, info := range other {
|
||||||
|
existingPorts.Insert(serviceName.Port)
|
||||||
|
_, exists := (*sm)[serviceName]
|
||||||
|
if !exists {
|
||||||
|
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, info.port, info.protocol)
|
||||||
|
} else {
|
||||||
|
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, info.port, info.protocol)
|
||||||
|
}
|
||||||
|
(*sm)[serviceName] = info
|
||||||
|
}
|
||||||
|
return existingPorts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) {
|
||||||
|
for serviceName := range other {
|
||||||
|
if existingPorts.Has(serviceName.Port) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
info, exists := (*sm)[serviceName]
|
||||||
|
if exists {
|
||||||
|
glog.V(1).Infof("Removing service %q", serviceName)
|
||||||
|
if info.protocol == api.ProtocolUDP {
|
||||||
|
staleServices.Insert(info.clusterIP.String())
|
||||||
|
}
|
||||||
|
delete(*sm, serviceName)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("Service %q removed, but doesn't exists", serviceName)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
|
func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
|
||||||
@ -410,7 +465,7 @@ func NewProxier(ipt utiliptables.Interface,
|
|||||||
serviceMap: make(proxyServiceMap),
|
serviceMap: make(proxyServiceMap),
|
||||||
serviceChanges: newServiceChangeMap(),
|
serviceChanges: newServiceChangeMap(),
|
||||||
endpointsMap: make(proxyEndpointsMap),
|
endpointsMap: make(proxyEndpointsMap),
|
||||||
endpointsChanges: newEndpointsChangeMap(),
|
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||||
syncPeriod: syncPeriod,
|
syncPeriod: syncPeriod,
|
||||||
minSyncPeriod: minSyncPeriod,
|
minSyncPeriod: minSyncPeriod,
|
||||||
throttle: throttle,
|
throttle: throttle,
|
||||||
@ -542,30 +597,48 @@ func (proxier *Proxier) SyncLoop() {
|
|||||||
|
|
||||||
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
|
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
|
||||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
proxier.serviceChanges.update(&namespacedName, nil, service)
|
if proxier.serviceChanges.update(&namespacedName, nil, service) {
|
||||||
|
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
|
||||||
|
// services is not finished, it doesn't make sense to call syncProxyRules
|
||||||
|
// because it will early-return (to avoid resyncing iptables with partial
|
||||||
|
// state right after kube-proxy restart). This can eat a token for calling
|
||||||
|
// syncProxyRules, but is not that critical since it can happen only
|
||||||
|
// after kube-proxy was (re)started.
|
||||||
proxier.syncProxyRules(syncReasonServices)
|
proxier.syncProxyRules(syncReasonServices)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
|
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
|
||||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
proxier.serviceChanges.update(&namespacedName, oldService, service)
|
if proxier.serviceChanges.update(&namespacedName, oldService, service) {
|
||||||
|
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
|
||||||
|
// services is not finished, it doesn't make sense to call syncProxyRules
|
||||||
|
// because it will early-return (to avoid resyncing iptables with partial
|
||||||
|
// state right after kube-proxy restart). This can eat a token for calling
|
||||||
|
// syncProxyRules, but is not that critical since it can happen only
|
||||||
|
// after kube-proxy was (re)started.
|
||||||
proxier.syncProxyRules(syncReasonServices)
|
proxier.syncProxyRules(syncReasonServices)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
|
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
|
||||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
proxier.serviceChanges.update(&namespacedName, service, nil)
|
if proxier.serviceChanges.update(&namespacedName, service, nil) {
|
||||||
|
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
|
||||||
|
// services is not finished, it doesn't make sense to call syncProxyRules
|
||||||
|
// because it will early-return (to avoid resyncing iptables with partial
|
||||||
|
// state right after kube-proxy restart). This can eat a token for calling
|
||||||
|
// syncProxyRules, but is not that critical since it can happen only
|
||||||
|
// after kube-proxy was (re)started.
|
||||||
proxier.syncProxyRules(syncReasonServices)
|
proxier.syncProxyRules(syncReasonServices)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (proxier *Proxier) OnServiceSynced() {
|
func (proxier *Proxier) OnServiceSynced() {
|
||||||
proxier.mu.Lock()
|
proxier.mu.Lock()
|
||||||
proxier.servicesSynced = true
|
proxier.servicesSynced = true
|
||||||
proxier.mu.Unlock()
|
proxier.mu.Unlock()
|
||||||
|
// Call it unconditionally - this is called once per lifetime.
|
||||||
proxier.syncProxyRules(syncReasonServices)
|
proxier.syncProxyRules(syncReasonServices)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -583,67 +656,6 @@ func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String) {
|
|
||||||
if service == nil {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
|
||||||
if shouldSkipService(svcName, service) {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
syncRequired := false
|
|
||||||
existingPorts := sets.NewString()
|
|
||||||
for i := range service.Spec.Ports {
|
|
||||||
servicePort := &service.Spec.Ports[i]
|
|
||||||
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
|
|
||||||
existingPorts.Insert(servicePort.Name)
|
|
||||||
info := newServiceInfo(serviceName, servicePort, service)
|
|
||||||
oldInfo, exists := (*sm)[serviceName]
|
|
||||||
equal := reflect.DeepEqual(info, oldInfo)
|
|
||||||
if exists {
|
|
||||||
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
|
|
||||||
} else if !equal {
|
|
||||||
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
|
|
||||||
}
|
|
||||||
if !equal {
|
|
||||||
(*sm)[serviceName] = info
|
|
||||||
syncRequired = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return syncRequired, existingPorts
|
|
||||||
}
|
|
||||||
|
|
||||||
// <staleServices> are modified by this function with detected stale services.
|
|
||||||
func (sm *proxyServiceMap) unmergeService(service *api.Service, existingPorts, staleServices sets.String) bool {
|
|
||||||
if service == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
|
||||||
if shouldSkipService(svcName, service) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
syncRequired := false
|
|
||||||
for i := range service.Spec.Ports {
|
|
||||||
servicePort := &service.Spec.Ports[i]
|
|
||||||
if existingPorts.Has(servicePort.Name) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
|
|
||||||
info, exists := (*sm)[serviceName]
|
|
||||||
if exists {
|
|
||||||
glog.V(1).Infof("Removing service %q", serviceName)
|
|
||||||
if info.protocol == api.ProtocolUDP {
|
|
||||||
staleServices.Insert(info.clusterIP.String())
|
|
||||||
}
|
|
||||||
delete(*sm, serviceName)
|
|
||||||
syncRequired = true
|
|
||||||
} else {
|
|
||||||
glog.Errorf("Service %q removed, but doesn't exists", serviceName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return syncRequired
|
|
||||||
}
|
|
||||||
|
|
||||||
// <serviceMap> is updated by this function (based on the given changes).
|
// <serviceMap> is updated by this function (based on the given changes).
|
||||||
// <changes> map is cleared after applying them.
|
// <changes> map is cleared after applying them.
|
||||||
func updateServiceMap(
|
func updateServiceMap(
|
||||||
@ -652,12 +664,16 @@ func updateServiceMap(
|
|||||||
syncRequired = false
|
syncRequired = false
|
||||||
staleServices = sets.NewString()
|
staleServices = sets.NewString()
|
||||||
|
|
||||||
|
func() {
|
||||||
|
changes.Lock()
|
||||||
|
defer changes.Unlock()
|
||||||
for _, change := range changes.items {
|
for _, change := range changes.items {
|
||||||
mergeSyncRequired, existingPorts := serviceMap.mergeService(change.current)
|
existingPorts := serviceMap.merge(change.current)
|
||||||
unmergeSyncRequired := serviceMap.unmergeService(change.previous, existingPorts, staleServices)
|
serviceMap.unmerge(change.previous, existingPorts, staleServices)
|
||||||
syncRequired = syncRequired || mergeSyncRequired || unmergeSyncRequired
|
syncRequired = true
|
||||||
}
|
}
|
||||||
changes.items = make(map[types.NamespacedName]*serviceChange)
|
changes.items = make(map[types.NamespacedName]*serviceChange)
|
||||||
|
}()
|
||||||
|
|
||||||
// TODO: If this will appear to be computationally expensive, consider
|
// TODO: If this will appear to be computationally expensive, consider
|
||||||
// computing this incrementally similarly to serviceMap.
|
// computing this incrementally similarly to serviceMap.
|
||||||
@ -673,30 +689,48 @@ func updateServiceMap(
|
|||||||
|
|
||||||
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
|
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
|
||||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||||
proxier.endpointsChanges.update(&namespacedName, nil, endpoints)
|
if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) {
|
||||||
|
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
|
||||||
|
// services is not finished, it doesn't make sense to call syncProxyRules
|
||||||
|
// because it will early-return (to avoid resyncing iptables with partial
|
||||||
|
// state right after kube-proxy restart). This can eat a token for calling
|
||||||
|
// syncProxyRules, but is not that critical since it can happen only
|
||||||
|
// after kube-proxy was (re)started.
|
||||||
proxier.syncProxyRules(syncReasonEndpoints)
|
proxier.syncProxyRules(syncReasonEndpoints)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
|
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
|
||||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||||
proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints)
|
if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) {
|
||||||
|
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
|
||||||
|
// services is not finished, it doesn't make sense to call syncProxyRules
|
||||||
|
// because it will early-return (to avoid resyncing iptables with partial
|
||||||
|
// state right after kube-proxy restart). This can eat a token for calling
|
||||||
|
// syncProxyRules, but is not that critical since it can happen only
|
||||||
|
// after kube-proxy was (re)started.
|
||||||
proxier.syncProxyRules(syncReasonEndpoints)
|
proxier.syncProxyRules(syncReasonEndpoints)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
|
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
|
||||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||||
proxier.endpointsChanges.update(&namespacedName, endpoints, nil)
|
if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) {
|
||||||
|
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
|
||||||
|
// services is not finished, it doesn't make sense to call syncProxyRules
|
||||||
|
// because it will early-return (to avoid resyncing iptables with partial
|
||||||
|
// state right after kube-proxy restart). This can eat a token for calling
|
||||||
|
// syncProxyRules, but is not that critical since it can happen only
|
||||||
|
// after kube-proxy was (re)started.
|
||||||
proxier.syncProxyRules(syncReasonEndpoints)
|
proxier.syncProxyRules(syncReasonEndpoints)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (proxier *Proxier) OnEndpointsSynced() {
|
func (proxier *Proxier) OnEndpointsSynced() {
|
||||||
proxier.mu.Lock()
|
proxier.mu.Lock()
|
||||||
proxier.endpointsSynced = true
|
proxier.endpointsSynced = true
|
||||||
proxier.mu.Unlock()
|
proxier.mu.Unlock()
|
||||||
|
// Call it unconditionally - this is called once per lifetime.
|
||||||
proxier.syncProxyRules(syncReasonEndpoints)
|
proxier.syncProxyRules(syncReasonEndpoints)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -708,17 +742,18 @@ func updateEndpointsMap(
|
|||||||
hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
|
hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
|
||||||
syncRequired = false
|
syncRequired = false
|
||||||
staleSet = make(map[endpointServicePair]bool)
|
staleSet = make(map[endpointServicePair]bool)
|
||||||
|
|
||||||
|
func() {
|
||||||
|
changes.Lock()
|
||||||
|
defer changes.Unlock()
|
||||||
for _, change := range changes.items {
|
for _, change := range changes.items {
|
||||||
oldEndpointsMap := endpointsToEndpointsMap(change.previous, hostname)
|
endpointsMap.unmerge(change.previous)
|
||||||
newEndpointsMap := endpointsToEndpointsMap(change.current, hostname)
|
endpointsMap.merge(change.current)
|
||||||
if !reflect.DeepEqual(oldEndpointsMap, newEndpointsMap) {
|
detectStaleConnections(change.previous, change.current, staleSet)
|
||||||
endpointsMap.unmerge(oldEndpointsMap)
|
|
||||||
endpointsMap.merge(newEndpointsMap)
|
|
||||||
detectStaleConnections(oldEndpointsMap, newEndpointsMap, staleSet)
|
|
||||||
syncRequired = true
|
syncRequired = true
|
||||||
}
|
}
|
||||||
}
|
|
||||||
changes.items = make(map[types.NamespacedName]*endpointsChange)
|
changes.items = make(map[types.NamespacedName]*endpointsChange)
|
||||||
|
}()
|
||||||
|
|
||||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
|
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
|
||||||
return
|
return
|
||||||
@ -820,6 +855,27 @@ func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEnd
|
|||||||
return endpointsMap
|
return endpointsMap
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Translates single Service object to proxyServiceMap.
|
||||||
|
//
|
||||||
|
// NOTE: service object should NOT be modified.
|
||||||
|
func serviceToServiceMap(service *api.Service) proxyServiceMap {
|
||||||
|
if service == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||||
|
if shouldSkipService(svcName, service) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceMap := make(proxyServiceMap)
|
||||||
|
for i := range service.Spec.Ports {
|
||||||
|
servicePort := &service.Spec.Ports[i]
|
||||||
|
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
|
||||||
|
serviceMap[serviceName] = newServiceInfo(serviceName, servicePort, service)
|
||||||
|
}
|
||||||
|
return serviceMap
|
||||||
|
}
|
||||||
|
|
||||||
// portProtoHash takes the ServicePortName and protocol for a service
|
// portProtoHash takes the ServicePortName and protocol for a service
|
||||||
// returns the associated 16 character hash. This is computed by hashing (sha256)
|
// returns the associated 16 character hash. This is computed by hashing (sha256)
|
||||||
// then encoding to base32 and truncating to 16 chars. We do this because IPTables
|
// then encoding to base32 and truncating to 16 chars. We do this because IPTables
|
||||||
@ -914,10 +970,8 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Figure out the new services we need to activate.
|
// Figure out the new services we need to activate.
|
||||||
proxier.serviceChanges.Lock()
|
|
||||||
serviceSyncRequired, hcServices, staleServices := updateServiceMap(
|
serviceSyncRequired, hcServices, staleServices := updateServiceMap(
|
||||||
proxier.serviceMap, &proxier.serviceChanges)
|
proxier.serviceMap, &proxier.serviceChanges)
|
||||||
proxier.serviceChanges.Unlock()
|
|
||||||
|
|
||||||
// If this was called because of a services update, but nothing actionable has changed, skip it.
|
// If this was called because of a services update, but nothing actionable has changed, skip it.
|
||||||
if reason == syncReasonServices && !serviceSyncRequired {
|
if reason == syncReasonServices && !serviceSyncRequired {
|
||||||
@ -925,10 +979,8 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
proxier.endpointsChanges.Lock()
|
|
||||||
endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap(
|
endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap(
|
||||||
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
|
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
|
||||||
proxier.endpointsChanges.Unlock()
|
|
||||||
|
|
||||||
// If this was called because of an endpoints update, but nothing actionable has changed, skip it.
|
// If this was called because of an endpoints update, but nothing actionable has changed, skip it.
|
||||||
if reason == syncReasonEndpoints && !endpointsSyncRequired {
|
if reason == syncReasonEndpoints && !endpointsSyncRequired {
|
||||||
|
@ -388,7 +388,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
|
|||||||
serviceMap: make(proxyServiceMap),
|
serviceMap: make(proxyServiceMap),
|
||||||
serviceChanges: newServiceChangeMap(),
|
serviceChanges: newServiceChangeMap(),
|
||||||
endpointsMap: make(proxyEndpointsMap),
|
endpointsMap: make(proxyEndpointsMap),
|
||||||
endpointsChanges: newEndpointsChangeMap(),
|
endpointsChanges: newEndpointsChangeMap(testHostname),
|
||||||
iptables: ipt,
|
iptables: ipt,
|
||||||
clusterCIDR: "10.0.0.0/24",
|
clusterCIDR: "10.0.0.0/24",
|
||||||
hostname: testHostname,
|
hostname: testHostname,
|
||||||
@ -1611,7 +1611,7 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.Serv
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Test_updateEndpointsMap(t *testing.T) {
|
func Test_updateEndpointsMap(t *testing.T) {
|
||||||
var nodeName = "host"
|
var nodeName = testHostname
|
||||||
|
|
||||||
unnamedPort := func(ept *api.Endpoints) {
|
unnamedPort := func(ept *api.Endpoints) {
|
||||||
ept.Subsets = []api.EndpointSubset{{
|
ept.Subsets = []api.EndpointSubset{{
|
||||||
|
Loading…
Reference in New Issue
Block a user