Merge pull request #44494 from wojtek-t/faster_kube_proxy

Automatic merge from submit-queue (batch tested with PRs 44722, 44704, 44681, 44494, 39732)

Don't rebuild endpoints map in iptables kube-proxy all the time.

@thockin - i think that this PR should help with yours https://github.com/kubernetes/kubernetes/pull/41030 - it (besides performance improvements) clearly defines when update because of endpoints is needed. If we do the same for services (I'm happy to help with it), i think it should be much simpler.

But please take a look if it makes sense from your perspective too.
This commit is contained in:
Kubernetes Submit Queue 2017-04-20 16:01:02 -07:00 committed by GitHub
commit 0a443ba4c7
3 changed files with 701 additions and 407 deletions

View File

@ -51,6 +51,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)

View File

@ -195,24 +195,44 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
return info
}
type endpointsMap map[types.NamespacedName]*api.Endpoints
type endpointsChange struct {
previous *api.Endpoints
current *api.Endpoints
}
type endpointsChangeMap map[types.NamespacedName]*endpointsChange
type serviceMap map[types.NamespacedName]*api.Service
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
for svcPort := range other {
em[svcPort] = other[svcPort]
}
}
func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
for svcPort := range other {
delete(em, svcPort)
}
}
// Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends.
type Proxier struct {
mu sync.Mutex // protects the following fields
serviceMap proxyServiceMap
endpointsMap proxyEndpointMap
portsMap map[localPort]closeable
// allServices and allEndpoints should never be modified by proxier - the
endpointsMap proxyEndpointsMap
// endpointsChanges contains all changes to endpoints that happened since
// last syncProxyRules call. For a single object, changes are accumulated,
// i.e. previous is state from before all of them, current is state after
// applying all of those.
endpointsChanges endpointsChangeMap
portsMap map[localPort]closeable
// allServices should never be modified by proxier - the
// pointers are shared with higher layers of kube-proxy. They are guaranteed
// to not be modified in the meantime, but also require to be not modified
// by Proxier.
allEndpoints endpointsMap
allServices serviceMap
allServices serviceMap
// endpointsSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating iptables
@ -330,24 +350,24 @@ func NewProxier(ipt utiliptables.Interface,
}
return &Proxier{
serviceMap: make(proxyServiceMap),
endpointsMap: make(proxyEndpointMap),
portsMap: make(map[localPort]closeable),
allEndpoints: make(endpointsMap),
allServices: make(serviceMap),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
recorder: recorder,
healthChecker: healthChecker,
serviceMap: make(proxyServiceMap),
endpointsMap: make(proxyEndpointsMap),
endpointsChanges: make(endpointsChangeMap),
portsMap: make(map[localPort]closeable),
allServices: make(serviceMap),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
recorder: recorder,
healthChecker: healthChecker,
}, nil
}
@ -566,16 +586,32 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.allEndpoints[namespacedName] = endpoints
change, exists := proxier.endpointsChanges[namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = nil
proxier.endpointsChanges[namespacedName] = change
}
change.current = endpoints
proxier.syncProxyRules(syncReasonEndpoints)
}
func (proxier *Proxier) OnEndpointsUpdate(_, endpoints *api.Endpoints) {
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.allEndpoints[namespacedName] = endpoints
change, exists := proxier.endpointsChanges[namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = oldEndpoints
proxier.endpointsChanges[namespacedName] = change
}
change.current = endpoints
proxier.syncProxyRules(syncReasonEndpoints)
}
@ -584,7 +620,15 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
delete(proxier.allEndpoints, namespacedName)
change, exists := proxier.endpointsChanges[namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = endpoints
proxier.endpointsChanges[namespacedName] = change
}
change.current = nil
proxier.syncProxyRules(syncReasonEndpoints)
}
@ -595,45 +639,65 @@ func (proxier *Proxier) OnEndpointsSynced() {
proxier.syncProxyRules(syncReasonEndpoints)
}
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
func buildNewEndpointsMap(allEndpoints endpointsMap, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
// return values
newMap = make(proxyEndpointMap)
hcEndpoints = make(map[types.NamespacedName]int)
// <endpointsMap> is updated by this function (based on the given changes).
// <changes> map is cleared after applying them.
func updateEndpointsMap(
endpointsMap proxyEndpointsMap,
changes *endpointsChangeMap,
hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
syncRequired = false
staleSet = make(map[endpointServicePair]bool)
// Update endpoints for services.
for _, endpoints := range allEndpoints {
accumulateEndpointsMap(endpoints, hostname, &newMap)
for _, change := range *changes {
oldEndpointsMap := endpointsToEndpointsMap(change.previous, hostname)
newEndpointsMap := endpointsToEndpointsMap(change.current, hostname)
if !reflect.DeepEqual(oldEndpointsMap, newEndpointsMap) {
endpointsMap.unmerge(oldEndpointsMap)
endpointsMap.merge(newEndpointsMap)
detectStaleConnections(oldEndpointsMap, newEndpointsMap, staleSet)
syncRequired = true
}
}
// Check stale connections against endpoints missing from the update.
// TODO: we should really only mark a connection stale if the proto was UDP
// and the (ip, port, proto) was removed from the endpoints.
for svcPort, epList := range curMap {
*changes = make(endpointsChangeMap)
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
}
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to endpointsMap.
hcEndpoints = make(map[types.NamespacedName]int)
localIPs := getLocalIPs(endpointsMap)
for nsn, ips := range localIPs {
hcEndpoints[nsn] = len(ips)
}
return syncRequired, hcEndpoints, staleSet
}
// <staleEndpoints> are modified by this function with detected stale
// connections.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) {
for svcPort, epList := range oldEndpointsMap {
for _, ep := range epList {
stale := true
for i := range newMap[svcPort] {
if *newMap[svcPort][i] == *ep {
for i := range newEndpointsMap[svcPort] {
if *newEndpointsMap[svcPort][i] == *ep {
stale = false
break
}
}
if stale {
glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint)
staleSet[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
}
}
}
}
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
}
// accumulate local IPs per service, ignoring ports
localIPs := map[types.NamespacedName]sets.String{}
for svcPort := range newMap {
for _, ep := range newMap[svcPort] {
func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
localIPs := make(map[types.NamespacedName]sets.String)
for svcPort := range endpointsMap {
for _, ep := range endpointsMap[svcPort] {
if ep.isLocal {
nsn := svcPort.NamespacedName
if localIPs[nsn] == nil {
@ -644,25 +708,19 @@ func buildNewEndpointsMap(allEndpoints endpointsMap, curMap proxyEndpointMap, ho
}
}
}
// produce a count per service
for nsn, ips := range localIPs {
hcEndpoints[nsn] = len(ips)
}
return newMap, hcEndpoints, staleSet
return localIPs
}
// Gather information about all the endpoint state for a given api.Endpoints.
// This can not report complete info on stale connections because it has limited
// scope - it only knows one Endpoints, but sees the whole current map. That
// cleanup has to be done above.
// Translates single Endpoints object to proxyEndpointsMap.
// This function is used for incremental updated of endpointsMap.
//
// NOTE: endpoints object should NOT be modified.
//
// TODO: this could be simplified:
// - the test for this is overlapped by the test for buildNewEndpointsMap
// - naming is poor and responsibilities are muddled
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoints *proxyEndpointMap) {
func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEndpointsMap {
if endpoints == nil {
return nil
}
endpointsMap := make(proxyEndpointsMap)
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
for i := range endpoints.Subsets {
@ -687,17 +745,18 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoi
endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))),
isLocal: addr.NodeName != nil && *addr.NodeName == hostname,
}
(*newEndpoints)[svcPort] = append((*newEndpoints)[svcPort], epInfo)
endpointsMap[svcPort] = append(endpointsMap[svcPort], epInfo)
}
if glog.V(3) {
newEPList := []string{}
for _, ep := range (*newEndpoints)[svcPort] {
for _, ep := range endpointsMap[svcPort] {
newEPList = append(newEPList, ep.endpoint)
}
glog.Infof("Setting endpoints for %q to %+v", svcPort, newEPList)
}
}
}
return endpointsMap
}
// portProtoHash takes the ServicePortName and protocol for a service
@ -784,7 +843,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if !proxier.endpointsSynced || proxier.allServices == nil {
if !proxier.endpointsSynced || !proxier.servicesSynced {
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}
@ -798,11 +857,11 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
return
}
// Figure out the new endpoints we need to activate.
newEndpoints, hcEndpoints, staleEndpoints := buildNewEndpointsMap(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname)
endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
// If this was called because of an endpoints update, but nothing actionable has changed, skip it.
if reason == syncReasonEndpoints && reflect.DeepEqual(newEndpoints, proxier.endpointsMap) {
if reason == syncReasonEndpoints && !endpointsSyncRequired {
glog.V(3).Infof("Skipping iptables sync because nothing changed")
return
}
@ -1157,7 +1216,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
// table doesn't currently have the same per-service structure that
// the nat table does, so we just stick this into the kube-services
// chain.
if len(newEndpoints[svcName]) == 0 {
if len(proxier.endpointsMap[svcName]) == 0 {
writeLine(filterRules,
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
@ -1170,7 +1229,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}
// If the service has no endpoints then reject packets.
if len(newEndpoints[svcName]) == 0 {
if len(proxier.endpointsMap[svcName]) == 0 {
writeLine(filterRules,
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
@ -1189,7 +1248,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
// These two slices parallel each other - keep in sync
endpoints := make([]*endpointsInfo, 0)
endpointChains := make([]utiliptables.Chain, 0)
for _, ep := range newEndpoints[svcName] {
for _, ep := range proxier.endpointsMap[svcName] {
endpoints = append(endpoints, ep)
endpointChain := servicePortEndpointChainName(svcNameString, protocol, ep.endpoint)
endpointChains = append(endpointChains, endpointChain)
@ -1379,7 +1438,6 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
// Finish housekeeping.
proxier.serviceMap = newServices
proxier.endpointsMap = newEndpoints
// TODO: these and clearUDPConntrackForPort() could be made more consistent.
utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())

File diff suppressed because it is too large Load Diff