rsync ipvs proxier to HEAD of iptables

This commit is contained in:
m1093782566
2017-09-06 16:46:09 +08:00
parent 0076f02df0
commit 44afb09339
2 changed files with 455 additions and 364 deletions

View File

@@ -28,6 +28,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/golang/glog"
@@ -35,9 +36,9 @@ import (
clientv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/helper"
apiservice "k8s.io/kubernetes/pkg/api/service"
@@ -45,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
@@ -105,8 +107,8 @@ type Proxier struct {
// with some partial data after kube-proxy restart.
endpointsSynced bool
servicesSynced bool
throttle flowcontrol.RateLimiter
initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
// These are effectively const and do not need the mutex to be held.
syncPeriod time.Duration
@@ -242,23 +244,14 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface,
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
var throttle flowcontrol.RateLimiter
// Defaulting back to not limit sync rate when minSyncPeriod is 0.
if minSyncPeriod != 0 {
syncsPerSecond := float32(time.Second) / float32(minSyncPeriod)
// The average use case will process 2 updates in short succession
throttle = flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2)
}
return &Proxier{
proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(),
endpointsChanges: newEndpointsChangeMap(hostname),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
@@ -276,7 +269,11 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface,
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
}, nil
}
burstSyncs := 2
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
return proxier, nil
}
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
@@ -294,37 +291,41 @@ type serviceInfo struct {
loadBalancerSourceRanges []string
onlyNodeLocalEndpoints bool
healthCheckNodePort int
// The following fields are computed and stored for performance reasons.
serviceNameString string
}
// <serviceMap> is updated by this function (based on the given changes).
// <changes> map is cleared after applying them.
func updateServiceMap(
serviceMap proxyServiceMap,
changes *serviceChangeMap) (syncRequired bool, hcServices map[types.NamespacedName]uint16, staleServices sets.String) {
syncRequired = false
staleServices = sets.NewString()
changes *serviceChangeMap) (result updateServiceMapResult) {
result.staleServices = sets.NewString()
for _, change := range changes.items {
mergeSyncRequired, existingPorts := serviceMap.mergeService(change.current)
unmergeSyncRequired := serviceMap.unmergeService(change.previous, existingPorts, staleServices)
syncRequired = syncRequired || mergeSyncRequired || unmergeSyncRequired
}
changes.items = make(map[types.NamespacedName]*serviceChange)
func() {
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
existingPorts := serviceMap.merge(change.current)
serviceMap.unmerge(change.previous, existingPorts, result.staleServices)
}
changes.items = make(map[types.NamespacedName]*serviceChange)
}()
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to serviceMap.
hcServices = make(map[types.NamespacedName]uint16)
for svcPort, info := range serviceMap {
result.hcServices = make(map[types.NamespacedName]uint16)
for svcPortName, info := range serviceMap {
if info.healthCheckNodePort != 0 {
hcServices[svcPort.NamespacedName] = uint16(info.healthCheckNodePort)
result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
}
}
return syncRequired, hcServices, staleServices
return result
}
// returns a new serviceInfo struct
func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
onlyNodeLocalEndpoints := false
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) &&
apiservice.RequestsOnlyLocalTraffic(service) {
@@ -347,90 +348,77 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
}
copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
copy(info.externalIPs, service.Spec.ExternalIPs)
if apiservice.NeedsHealthCheck(service) {
p := service.Spec.HealthCheckNodePort
if p == 0 {
glog.Errorf("Service %q has no healthcheck nodeport", serviceName)
glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String())
} else {
info.healthCheckNodePort = int(p)
}
}
// Store the following for performance reasons.
info.serviceNameString = svcPortName.String()
return info
}
func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String) {
if service == nil {
return false, nil
}
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if utilproxy.ShouldSkipService(svcName, service) {
return false, nil
}
syncRequired := false
func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
existingPorts := sets.NewString()
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
existingPorts.Insert(servicePort.Name)
info := newServiceInfo(serviceName, servicePort, service)
oldInfo, exists := (*sm)[serviceName]
equal := reflect.DeepEqual(info, oldInfo)
if exists {
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
} else if !equal {
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
}
if !equal {
(*sm)[serviceName] = info
syncRequired = true
for svcPortName, info := range other {
existingPorts.Insert(svcPortName.Port)
_, exists := (*sm)[svcPortName]
if !exists {
glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
} else {
glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
}
(*sm)[svcPortName] = info
}
return syncRequired, existingPorts
return existingPorts
}
// <staleServices> are modified by this function with detected stale services.
func (sm *proxyServiceMap) unmergeService(service *api.Service, existingPorts, staleServices sets.String) bool {
if service == nil {
return false
}
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if utilproxy.ShouldSkipService(svcName, service) {
return false
}
syncRequired := false
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
if existingPorts.Has(servicePort.Name) {
func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) {
for svcPortName := range other {
if existingPorts.Has(svcPortName.Port) {
continue
}
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
info, exists := (*sm)[serviceName]
info, exists := (*sm)[svcPortName]
if exists {
glog.V(1).Infof("Removing service %q", serviceName)
glog.V(1).Infof("Removing service port %q", svcPortName)
if info.protocol == api.ProtocolUDP {
staleServices.Insert(info.clusterIP.String())
}
delete(*sm, serviceName)
syncRequired = true
delete(*sm, svcPortName)
} else {
glog.Errorf("Service %q removed, but doesn't exists", serviceName)
glog.Errorf("Service port %q removed, but doesn't exists", svcPortName)
}
}
return syncRequired
}
type serviceChangeMap struct {
sync.Mutex
lock sync.Mutex
items map[types.NamespacedName]*serviceChange
}
type serviceChange struct {
previous *api.Service
current *api.Service
previous proxyServiceMap
current proxyServiceMap
}
type updateEndpointMapResult struct {
hcEndpoints map[types.NamespacedName]int
staleEndpoints map[endpointServicePair]bool
staleServiceNames map[proxy.ServicePortName]bool
}
type updateServiceMapResult struct {
hcServices map[types.NamespacedName]uint16
staleServices sets.String
}
func newServiceChangeMap() serviceChangeMap {
@@ -439,17 +427,42 @@ func newServiceChangeMap() serviceChangeMap {
}
}
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) {
scm.Lock()
defer scm.Unlock()
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool {
scm.lock.Lock()
defer scm.lock.Unlock()
change, exists := scm.items[*namespacedName]
if !exists {
change = &serviceChange{}
change.previous = previous
change.previous = serviceToServiceMap(previous)
scm.items[*namespacedName] = change
}
change.current = current
change.current = serviceToServiceMap(current)
if reflect.DeepEqual(change.previous, change.current) {
delete(scm.items, *namespacedName)
}
return len(scm.items) > 0
}
// Translates single Service object to proxyServiceMap.
//
// NOTE: service object should NOT be modified.
func serviceToServiceMap(service *api.Service) proxyServiceMap {
if service == nil {
return nil
}
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if utilproxy.ShouldSkipService(svcName, service) {
return nil
}
serviceMap := make(proxyServiceMap)
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service)
}
return serviceMap
}
// internal struct for endpoints information
@@ -462,6 +475,14 @@ func (e *endpointsInfo) String() string {
return fmt.Sprintf("%v", *e)
}
// IPPart returns just the IP part of the endpoint.
func (e *endpointsInfo) IPPart() string {
if index := strings.Index(e.endpoint, ":"); index != -1 {
return e.endpoint[0:index]
}
return e.endpoint
}
type endpointServicePair struct {
endpoint string
servicePortName proxy.ServicePortName
@@ -470,33 +491,40 @@ type endpointServicePair struct {
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
type endpointsChange struct {
previous *api.Endpoints
current *api.Endpoints
previous proxyEndpointsMap
current proxyEndpointsMap
}
type endpointsChangeMap struct {
sync.Mutex
items map[types.NamespacedName]*endpointsChange
lock sync.Mutex
hostname string
items map[types.NamespacedName]*endpointsChange
}
// <staleEndpoints> are modified by this function with detected stale
// connections.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) {
for svcPort, epList := range oldEndpointsMap {
// <staleEndpoints> and <staleServices> are modified by this function with detected stale connections.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) {
for svcPortName, epList := range oldEndpointsMap {
for _, ep := range epList {
stale := true
for i := range newEndpointsMap[svcPort] {
if *newEndpointsMap[svcPort][i] == *ep {
for i := range newEndpointsMap[svcPortName] {
if *newEndpointsMap[svcPortName][i] == *ep {
stale = false
break
}
}
if stale {
glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint)
staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
glog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.endpoint)
staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPortName}] = true
}
}
}
for svcPortName, epList := range newEndpointsMap {
// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
staleServiceNames[svcPortName] = true
}
}
}
// <endpointsMap> is updated by this function (based on the given changes).
@@ -504,20 +532,20 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap,
func updateEndpointsMap(
endpointsMap proxyEndpointsMap,
changes *endpointsChangeMap,
hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
syncRequired = false
staleSet = make(map[endpointServicePair]bool)
for _, change := range changes.items {
oldEndpointsMap := endpointsToEndpointsMap(change.previous, hostname)
newEndpointsMap := endpointsToEndpointsMap(change.current, hostname)
if !reflect.DeepEqual(oldEndpointsMap, newEndpointsMap) {
endpointsMap.unmerge(oldEndpointsMap)
endpointsMap.merge(newEndpointsMap)
detectStaleConnections(oldEndpointsMap, newEndpointsMap, staleSet)
syncRequired = true
hostname string) (result updateEndpointMapResult) {
result.staleEndpoints = make(map[endpointServicePair]bool)
result.staleServiceNames = make(map[proxy.ServicePortName]bool)
func() {
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
endpointsMap.unmerge(change.previous)
endpointsMap.merge(change.current)
detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames)
}
}
changes.items = make(map[types.NamespacedName]*endpointsChange)
changes.items = make(map[types.NamespacedName]*endpointsChange)
}()
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
@@ -525,13 +553,13 @@ func updateEndpointsMap(
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to endpointsMap.
hcEndpoints = make(map[types.NamespacedName]int)
result.hcEndpoints = make(map[types.NamespacedName]int)
localIPs := getLocalIPs(endpointsMap)
for nsn, ips := range localIPs {
hcEndpoints[nsn] = len(ips)
result.hcEndpoints[nsn] = len(ips)
}
return syncRequired, hcEndpoints, staleSet
return result
}
// Translates single Endpoints object to proxyEndpointsMap.
@@ -582,23 +610,28 @@ func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEnd
return endpointsMap
}
func newEndpointsChangeMap() endpointsChangeMap {
func newEndpointsChangeMap(hostname string) endpointsChangeMap {
return endpointsChangeMap{
items: make(map[types.NamespacedName]*endpointsChange),
hostname: hostname,
items: make(map[types.NamespacedName]*endpointsChange),
}
}
func (ecm *endpointsChangeMap) Update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) {
ecm.Lock()
defer ecm.Unlock()
func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool {
ecm.lock.Lock()
defer ecm.lock.Unlock()
change, exists := ecm.items[*namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = previous
change.previous = endpointsToEndpointsMap(previous, ecm.hostname)
ecm.items[*namespacedName] = change
}
change.current = current
change.current = endpointsToEndpointsMap(current, ecm.hostname)
if reflect.DeepEqual(change.previous, change.current) {
delete(ecm.items, *namespacedName)
}
return len(ecm.items) > 0
}
func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
@@ -726,81 +759,89 @@ func CleanupLeftovers(execer utilexec.Interface, ipvs utilipvs.Interface, ipt ut
return encounteredError
}
// Sync is called to immediately synchronize the proxier state to iptables
// Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible.
func (proxier *Proxier) Sync() {
proxier.syncProxyRules(syncReasonForce)
proxier.syncRunner.Run()
}
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (proxier *Proxier) SyncLoop() {
t := time.NewTicker(proxier.syncPeriod)
defer t.Stop()
// Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil {
proxier.healthzServer.UpdateTimestamp()
}
for {
<-t.C
glog.V(6).Infof("Periodic sync")
proxier.Sync()
proxier.syncRunner.Loop(wait.NeverStop)
}
func (proxier *Proxier) setInitialized(value bool) {
var initialized int32
if value {
initialized = 1
}
atomic.StoreInt32(&proxier.initialized, initialized)
}
func (proxier *Proxier) isInitialized() bool {
return atomic.LoadInt32(&proxier.initialized) > 0
}
// OnServiceAdd is called whenever creation of new service object is observed.
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.serviceChanges.update(&namespacedName, nil, service)
proxier.syncProxyRules(syncReasonServices)
if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
// OnServiceUpdate is called whenever modification of an existing service object is observed.
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.serviceChanges.update(&namespacedName, oldService, service)
proxier.syncProxyRules(syncReasonServices)
if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
// OnServiceDelete is called whenever deletion of an existing service object is observed.
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.serviceChanges.update(&namespacedName, service, nil)
proxier.syncProxyRules(syncReasonServices)
if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
// OnServiceSynced is called once all the initial even handlers were called and the state is fully propagated to local cache.
func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock()
proxier.servicesSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.mu.Unlock()
proxier.syncProxyRules(syncReasonServices)
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
}
// OnEndpointsAdd is called whenever creation of new endpoints object is observed.
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.endpointsChanges.Update(&namespacedName, nil, endpoints)
proxier.syncProxyRules(syncReasonEndpoints)
if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
// OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.endpointsChanges.Update(&namespacedName, oldEndpoints, endpoints)
proxier.syncProxyRules(syncReasonEndpoints)
if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
// OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed.
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.endpointsChanges.Update(&namespacedName, endpoints, nil)
proxier.syncProxyRules(syncReasonEndpoints)
if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
// OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
@@ -809,7 +850,7 @@ func (proxier *Proxier) OnEndpointsSynced() {
proxier.endpointsSynced = true
proxier.mu.Unlock()
proxier.syncProxyRules(syncReasonEndpoints)
proxier.syncProxyRules()
}
type syncReason string
@@ -820,16 +861,13 @@ const syncReasonForce syncReason = "Force"
// This is where all of the ipvs calls happen.
// assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules(reason syncReason) {
func (proxier *Proxier) syncProxyRules() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
if proxier.throttle != nil {
proxier.throttle.Accept()
}
start := time.Now()
defer func() {
glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start))
glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if !proxier.endpointsSynced || !proxier.servicesSynced {
@@ -837,27 +875,21 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
return
}
// Figure out the new services we need to activate.
proxier.serviceChanges.Lock()
serviceSyncRequired, hcServices, staleServices := updateServiceMap(
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
serviceUpdateResult := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges)
proxier.serviceChanges.Unlock()
// If this was called because of a services update, but nothing actionable has changed, skip it.
if reason == syncReasonServices && !serviceSyncRequired {
glog.V(3).Infof("Skipping ipvs sync because nothing changed")
return
}
proxier.endpointsChanges.Lock()
endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap(
endpointUpdateResult := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
proxier.endpointsChanges.Unlock()
// If this was called because of an endpoints update, but nothing actionable has changed, skip it.
if reason == syncReasonEndpoints && !endpointsSyncRequired {
glog.V(3).Infof("Skipping ipvs sync because nothing changed")
return
staleServices := serviceUpdateResult.staleServices
// merge stale services gathered from updateEndpointsMap
for svcPortName := range endpointUpdateResult.staleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP {
glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String())
staleServices.Insert(svcInfo.clusterIP.String())
}
}
glog.V(3).Infof("Syncing ipvs Proxier rules")
@@ -1166,13 +1198,14 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
// Sync iptables rules.
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
natLines := append(proxier.natChains.Bytes(), proxier.natRules.Bytes()...)
lines := natLines
proxier.iptablesData.Reset()
proxier.iptablesData.Write(proxier.natChains.Bytes())
proxier.iptablesData.Write(proxier.natRules.Bytes())
glog.V(3).Infof("Restoring iptables rules: %s", lines)
err = proxier.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, lines)
glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())
// Revert new local ports.
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
return
@@ -1197,18 +1230,18 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
// Update healthz timestamp if it is periodic sync.
if proxier.healthzServer != nil && reason == syncReasonForce {
// Update healthz timestamp
if proxier.healthzServer != nil {
proxier.healthzServer.UpdateTimestamp()
}
// Update healthchecks. The endpoints list might include services that are
// not "OnlyLocal", but the services list will not, and the healthChecker
// will just drop those endpoints.
if err := proxier.healthChecker.SyncServices(hcServices); err != nil {
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil {
glog.Errorf("Error syncing healtcheck services: %v", err)
}
if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil {
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
glog.Errorf("Error syncing healthcheck endpoints: %v", err)
}
@@ -1219,7 +1252,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
}
}
proxier.deleteEndpointConnections(staleEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints)
}
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
@@ -1408,7 +1441,15 @@ func (proxier *Proxier) linkKubeServiceChain(existingNATChains map[utiliptables.
// Join all words with spaces, terminate with newline and write to buff.
func writeLine(buf *bytes.Buffer, words ...string) {
buf.WriteString(strings.Join(words, " ") + "\n")
// We avoid strings.Join for performance reasons.
for i := range words {
buf.WriteString(words[i])
if i < len(words)-1 {
buf.WriteByte(' ')
} else {
buf.WriteByte('\n')
}
}
}
func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
@@ -1420,8 +1461,7 @@ func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.S
if localIPs[nsn] == nil {
localIPs[nsn] = sets.NewString()
}
ip := strings.Split(ep.endpoint, ":")[0] // just the IP part
localIPs[nsn].Insert(ip)
localIPs[nsn].Insert(ep.IPPart()) // just the IP part
}
}
}