Don't rebuild service map in iptables kube-proxy all the time

This commit is contained in:
Wojciech Tyczynski
2017-04-18 14:51:14 +02:00
parent bf532a30e3
commit c7353432df
3 changed files with 269 additions and 166 deletions

View File

@@ -32,7 +32,6 @@ import (
"sync"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
@@ -199,8 +198,14 @@ type endpointsChange struct {
previous *api.Endpoints
current *api.Endpoints
}
type serviceChange struct {
previous *api.Service
current *api.Service
}
type endpointsChangeMap map[types.NamespacedName]*endpointsChange
type serviceMap map[types.NamespacedName]*api.Service
type serviceChangeMap map[types.NamespacedName]*serviceChange
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
@@ -219,20 +224,23 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
// Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends.
type Proxier struct {
mu sync.Mutex // protects the following fields
serviceMap proxyServiceMap
mu sync.Mutex // protects the following fields
serviceMap proxyServiceMap
// serviceChanges contains all changes to services that happened since
// last syncProxyRules call. For a single object, changes are accumulated,
// i.e. previous is state from before all of them, current is state after
// applying all of those.
serviceChanges serviceChangeMap
endpointsMap proxyEndpointsMap
// endpointsChanges contains all changes to endpoints that happened since
// last syncProxyRules call. For a single object, changes are accumulated,
// i.e. previous is state from before all of them, current is state after
// applying all of those.
endpointsChanges endpointsChangeMap
portsMap map[localPort]closeable
// allServices should never be modified by proxier - the
// pointers are shared with higher layers of kube-proxy. They are guaranteed
// to not be modified in the meantime, but also require to be not modified
// by Proxier.
allServices serviceMap
portsMap map[localPort]closeable
// endpointsSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating iptables
@@ -350,11 +358,11 @@ func NewProxier(ipt utiliptables.Interface,
}
return &Proxier{
portsMap: make(map[localPort]closeable),
serviceMap: make(proxyServiceMap),
serviceChanges: make(serviceChangeMap),
endpointsMap: make(proxyEndpointsMap),
endpointsChanges: make(endpointsChangeMap),
portsMap: make(map[localPort]closeable),
allServices: make(serviceMap),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
@@ -476,92 +484,37 @@ func (proxier *Proxier) SyncLoop() {
}
}
// Accepts a list of Services and the existing service map. Returns the new
// service map, a map of healthcheck ports, and a set of stale UDP
// services.
func buildNewServiceMap(allServices serviceMap, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) {
newServiceMap := make(proxyServiceMap)
hcPorts := make(map[types.NamespacedName]uint16)
for _, service := range allServices {
svcName := types.NamespacedName{
Namespace: service.Namespace,
Name: service.Name,
}
// if ClusterIP is "None" or empty, skip proxying
if !helper.IsServiceIPSet(service) {
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
continue
}
// Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
if service.Spec.Type == api.ServiceTypeExternalName {
glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName)
continue
}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
serviceName := proxy.ServicePortName{
NamespacedName: svcName,
Port: servicePort.Name,
}
info := newServiceInfo(serviceName, servicePort, service)
oldInfo, exists := oldServiceMap[serviceName]
equal := reflect.DeepEqual(info, oldInfo)
if !exists {
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
} else if !equal {
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
}
if info.onlyNodeLocalEndpoints {
hcPorts[svcName] = uint16(info.healthCheckNodePort)
}
newServiceMap[serviceName] = info
glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
}
}
for nsn, port := range hcPorts {
if port == 0 {
glog.Errorf("Service %q has no healthcheck nodeport", nsn)
delete(hcPorts, nsn)
}
}
staleUDPServices := sets.NewString()
// Remove serviceports missing from the update.
for name, info := range oldServiceMap {
if _, exists := newServiceMap[name]; !exists {
glog.V(1).Infof("Removing service %q", name)
if info.protocol == api.ProtocolUDP {
staleUDPServices.Insert(info.clusterIP.String())
}
}
}
return newServiceMap, hcPorts, staleUDPServices
}
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.allServices[namespacedName] = service
change, exists := proxier.serviceChanges[namespacedName]
if !exists {
change = &serviceChange{}
change.previous = nil
proxier.serviceChanges[namespacedName] = change
}
change.current = service
proxier.syncProxyRules(syncReasonServices)
}
func (proxier *Proxier) OnServiceUpdate(_, service *api.Service) {
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.allServices[namespacedName] = service
change, exists := proxier.serviceChanges[namespacedName]
if !exists {
change = &serviceChange{}
change.previous = oldService
proxier.serviceChanges[namespacedName] = change
}
change.current = service
proxier.syncProxyRules(syncReasonServices)
}
@@ -570,7 +523,15 @@ func (proxier *Proxier) OnServiceDelete(service *api.Service) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
delete(proxier.allServices, namespacedName)
change, exists := proxier.serviceChanges[namespacedName]
if !exists {
change = &serviceChange{}
change.previous = service
proxier.serviceChanges[namespacedName] = change
}
change.current = nil
proxier.syncProxyRules(syncReasonServices)
}
@@ -581,6 +542,114 @@ func (proxier *Proxier) OnServiceSynced() {
proxier.syncProxyRules(syncReasonServices)
}
func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool {
// if ClusterIP is "None" or empty, skip proxying
if !helper.IsServiceIPSet(service) {
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
return true
}
// Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
if service.Spec.Type == api.ServiceTypeExternalName {
glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName)
return true
}
return false
}
func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String) {
if service == nil {
return false, nil
}
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if shouldSkipService(svcName, service) {
return false, nil
}
syncRequired := false
existingPorts := sets.NewString()
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
existingPorts.Insert(servicePort.Name)
info := newServiceInfo(serviceName, servicePort, service)
oldInfo, exists := (*sm)[serviceName]
equal := reflect.DeepEqual(info, oldInfo)
if exists {
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
} else if !equal {
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
}
if !equal {
(*sm)[serviceName] = info
syncRequired = true
}
}
return syncRequired, existingPorts
}
// <staleServices> are modified by this function with detected stale services.
func (sm *proxyServiceMap) unmergeService(service *api.Service, existingPorts, staleServices sets.String) bool {
if service == nil {
return false
}
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if shouldSkipService(svcName, service) {
return false
}
syncRequired := false
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
if existingPorts.Has(servicePort.Name) {
continue
}
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
info, exists := (*sm)[serviceName]
if exists {
glog.V(1).Infof("Removing service %q", serviceName)
if info.protocol == api.ProtocolUDP {
staleServices.Insert(info.clusterIP.String())
}
delete(*sm, serviceName)
syncRequired = true
} else {
glog.Errorf("Service %q removed, but doesn't exists", serviceName)
}
}
return syncRequired
}
// <serviceMap> is updated by this function (based on the given changes).
// <changes> map is cleared after applying them.
func updateServiceMap(
serviceMap proxyServiceMap,
changes *serviceChangeMap) (syncRequired bool, hcServices map[types.NamespacedName]uint16, staleServices sets.String) {
syncRequired = false
staleServices = sets.NewString()
for _, change := range *changes {
mergeSyncRequired, existingPorts := serviceMap.mergeService(change.current)
unmergeSyncRequired := serviceMap.unmergeService(change.previous, existingPorts, staleServices)
syncRequired = syncRequired || mergeSyncRequired || unmergeSyncRequired
}
*changes = make(serviceChangeMap)
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to serviceMap.
hcServices = make(map[types.NamespacedName]uint16)
for svcPort, info := range serviceMap {
if info.onlyNodeLocalEndpoints {
hcServices[svcPort.NamespacedName] = uint16(info.healthCheckNodePort)
}
}
for nsn, port := range hcServices {
if port == 0 {
glog.Errorf("Service %q has no healthcheck nodeport", nsn)
delete(hcServices, nsn)
}
}
return syncRequired, hcServices, staleServices
}
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
@@ -849,10 +918,11 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}
// Figure out the new services we need to activate.
newServices, hcServices, staleServices := buildNewServiceMap(proxier.allServices, proxier.serviceMap)
serviceSyncRequired, hcServices, staleServices := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges)
// If this was called because of a services update, but nothing actionable has changed, skip it.
if reason == syncReasonServices && reflect.DeepEqual(newServices, proxier.serviceMap) {
if reason == syncReasonServices && !serviceSyncRequired {
glog.V(3).Infof("Skipping iptables sync because nothing changed")
return
}
@@ -996,7 +1066,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
replacementPortsMap := map[localPort]closeable{}
// Build rules for each service.
for svcName, svcInfo := range newServices {
for svcName, svcInfo := range proxier.serviceMap {
protocol := strings.ToLower(string(svcInfo.protocol))
// Precompute svcNameString; with many services the many calls
// to ServicePortName.String() show up in CPU profiles.
@@ -1437,8 +1507,6 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}
// Finish housekeeping.
proxier.serviceMap = newServices
// TODO: these and clearUDPConntrackForPort() could be made more consistent.
utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())
proxier.deleteEndpointConnections(staleEndpoints)