Merge pull request #41022 from thockin/proxy-defer-on-update-events

Automatic merge from submit-queue (batch tested with PRs 41505, 41484, 41544, 41514, 41022)

Proxy defer on update events

This PR is a series of discrete movements in refactoring some of kube-proxy's twistier code in prep to be more async.  It should be reviewed one commit at a time.  Each commit is a smallish movement, which should be easier to examine.  I added significant tests along the way, which, unsurprisingly, found some bugs.
This commit is contained in:
Kubernetes Submit Queue 2017-02-16 14:28:24 -08:00 committed by GitHub
commit 4ac2749af6
3 changed files with 1179 additions and 236 deletions

View File

@ -20,7 +20,6 @@ go_library(
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/slice:go_default_library",
"//pkg/util/sysctl:go_default_library",
"//pkg/util/version:go_default_library",
"//vendor:github.com/davecgh/go-spew/spew",
@ -46,9 +45,11 @@ go_test(
"//pkg/util/exec:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/iptables/testing:go_default_library",
"//vendor:github.com/davecgh/go-spew/spew",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
],
)

View File

@ -48,7 +48,6 @@ import (
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/kubernetes/pkg/util/slice"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
utilversion "k8s.io/kubernetes/pkg/util/version"
)
@ -150,8 +149,8 @@ type serviceInfo struct {
// internal struct for endpoints information
type endpointsInfo struct {
ip string
localEndpoint bool
endpoint string // TODO: should be an endpointString type
isLocal bool
}
// returns a new serviceInfo struct
@ -191,13 +190,13 @@ type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
// Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends.
type Proxier struct {
mu sync.Mutex // protects the following fields
serviceMap proxyServiceMap
endpointsMap map[proxy.ServicePortName][]*endpointsInfo
portsMap map[localPort]closeable
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
throttle flowcontrol.RateLimiter
mu sync.Mutex // protects the following fields
serviceMap proxyServiceMap
endpointsMap map[proxy.ServicePortName][]*endpointsInfo
portsMap map[localPort]closeable
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
allEndpoints []api.Endpoints // nil until we have seen an OnEndpointsUpdate event
throttle flowcontrol.RateLimiter
// These are effectively const and do not need the mutex to be held.
syncPeriod time.Duration
@ -211,6 +210,7 @@ type Proxier struct {
nodeIP net.IP
portMapper portOpener
recorder record.EventRecorder
healthChecker healthChecker
}
type localPort struct {
@ -242,6 +242,17 @@ func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) {
return openLocalPort(lp)
}
type healthChecker interface {
UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String)
}
// TODO: the healthcheck pkg should offer a type
type globalHealthChecker struct{}
func (globalHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {
healthcheck.UpdateEndpoints(serviceName, endpointUIDs)
}
// Proxier implements ProxyProvider
var _ proxy.ProxyProvider = &Proxier{}
@ -295,6 +306,7 @@ func NewProxier(ipt utiliptables.Interface,
glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
}
healthChecker := globalHealthChecker{}
go healthcheck.Run()
var throttle flowcontrol.RateLimiter
@ -321,6 +333,7 @@ func NewProxier(ipt utiliptables.Interface,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
recorder: recorder,
healthChecker: healthChecker,
}, nil
}
@ -542,15 +555,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
proxier.deleteServiceConnections(staleUDPServices.List())
}
// Generate a list of ip strings from the list of endpoint infos
func flattenEndpointsInfo(endPoints []*endpointsInfo) []string {
var endpointIPs []string
for _, ep := range endPoints {
endpointIPs = append(endpointIPs, ep.ip)
}
return endpointIPs
}
// Reconstruct the list of endpoint infos from the endpointIP list
// Use the slice of endpointIPs to rebuild a slice of corresponding {endpointIP, localEndpointOnly} infos
// from the full []hostPortInfo slice.
@ -562,7 +566,7 @@ func flattenEndpointsInfo(endPoints []*endpointsInfo) []string {
// then output will be
//
// []endpointsInfo{ {"2.2.2.2:80", localEndpointOnly=<bool>} }
func (proxier *Proxier) buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*endpointsInfo {
func buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*endpointsInfo {
lookupSet := sets.NewString()
for _, ip := range endpointIPs {
lookupSet.Insert(ip)
@ -571,7 +575,7 @@ func (proxier *Proxier) buildEndpointInfoList(endPoints []hostPortInfo, endpoint
for _, hpp := range endPoints {
key := net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))
if lookupSet.Has(key) {
filteredEndpoints = append(filteredEndpoints, &endpointsInfo{ip: key, localEndpoint: hpp.localEndpoint})
filteredEndpoints = append(filteredEndpoints, &endpointsInfo{endpoint: key, isLocal: hpp.isLocal})
}
}
return filteredEndpoints
@ -579,92 +583,17 @@ func (proxier *Proxier) buildEndpointInfoList(endPoints []hostPortInfo, endpoint
// OnEndpointsUpdate takes in a slice of updated endpoints.
func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
start := time.Now()
defer func() {
glog.V(4).Infof("OnEndpointsUpdate took %v for %d endpoints", time.Since(start), len(allEndpoints))
}()
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.haveReceivedEndpointsUpdate = true
activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
staleConnections := make(map[endpointServicePair]bool)
svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo)
newEndpointsMap := make(map[proxy.ServicePortName][]*endpointsInfo)
// Update endpoints for services.
for i := range allEndpoints {
svcEndpoints := &allEndpoints[i]
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
portsToEndpoints := map[string][]hostPortInfo{}
for i := range svcEndpoints.Subsets {
ss := &svcEndpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
var isLocalEndpoint bool
if addr.NodeName != nil {
isLocalEndpoint = *addr.NodeName == proxier.hostname
isLocalEndpoint = utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) && isLocalEndpoint
}
hostPortObject := hostPortInfo{
host: addr.IP,
port: int(port.Port),
localEndpoint: isLocalEndpoint,
}
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject)
}
}
}
for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
svcPortToInfoMap[svcPort] = portsToEndpoints[portname]
curEndpoints := proxier.endpointsMap[svcPort]
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
// Flatten the list of current endpoint infos to just a list of ips as strings
curEndpointIPs := flattenEndpointsInfo(curEndpoints)
if len(curEndpointIPs) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEndpoints) {
glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
// Gather stale connections to removed endpoints
removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEndpoints)
for _, ep := range removedEndpoints {
staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true
}
}
// Once the set operations using the list of ips are complete, build the list of endpoint infos
newEndpointsMap[svcPort] = proxier.buildEndpointInfoList(portsToEndpoints[portname], newEndpoints)
activeEndpoints[svcPort] = true
}
}
// Check stale connections against endpoints missing from the update.
for svcPort := range proxier.endpointsMap {
if !activeEndpoints[svcPort] {
glog.V(2).Infof("Removing endpoints for %q", svcPort)
// record endpoints of unactive service to stale connections
for _, ep := range proxier.endpointsMap[svcPort] {
staleConnections[endpointServicePair{endpoint: ep.ip, servicePortName: svcPort}] = true
}
}
if proxier.allEndpoints == nil {
glog.V(2).Info("Received first Endpoints update")
}
proxier.allEndpoints = allEndpoints
// Update service health check
allSvcPorts := make(map[proxy.ServicePortName]bool)
for svcPort := range proxier.endpointsMap {
allSvcPorts[svcPort] = true
}
for svcPort := range newEndpointsMap {
allSvcPorts[svcPort] = true
}
for svcPort := range allSvcPorts {
proxier.updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort])
}
if len(newEndpointsMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newEndpointsMap, proxier.endpointsMap) {
proxier.endpointsMap = newEndpointsMap
// TODO: once service has made this same transform, move this into proxier.syncProxyRules()
newMap, staleConnections := updateEndpoints(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname, proxier.healthChecker)
if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) {
proxier.endpointsMap = newMap
proxier.syncProxyRules()
} else {
glog.V(4).Infof("Skipping proxy iptables rule sync on endpoint update because nothing changed")
@ -673,41 +602,127 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
proxier.deleteEndpointConnections(staleConnections)
}
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string,
healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, staleSet map[endpointServicePair]bool) {
// return values
newMap = make(map[proxy.ServicePortName][]*endpointsInfo)
staleSet = make(map[endpointServicePair]bool)
// local
svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo)
// Update endpoints for services.
for i := range allEndpoints {
accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap)
}
// Check stale connections against endpoints missing from the update.
// TODO: we should really only mark a connection stale if the proto was UDP
// and the (ip, port, proto) was removed from the endpoints.
for svcPort, epList := range curMap {
for _, ep := range epList {
stale := true
for i := range newMap[svcPort] {
if *newMap[svcPort][i] == *ep {
stale = false
break
}
}
if stale {
glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint)
staleSet[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
}
}
}
// Update service health check
allSvcPorts := make(map[proxy.ServicePortName]bool)
for svcPort := range curMap {
allSvcPorts[svcPort] = true
}
for svcPort := range newMap {
allSvcPorts[svcPort] = true
}
for svcPort := range allSvcPorts {
updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort], healthChecker)
}
return newMap, staleSet
}
// Gather information about all the endpoint state for a given api.Endpoints.
// This can not report complete info on stale connections because it has limited
// scope - it only knows one Endpoints, but sees the whole current map. That
// cleanup has to be done above.
//
// TODO: this could be simplified:
// - hostPortInfo and endpointsInfo overlap too much
// - the test for this is overlapped by the test for updateEndpoints
// - naming is poor and responsibilities are muddled
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
curEndpoints map[proxy.ServicePortName][]*endpointsInfo,
newEndpoints *map[proxy.ServicePortName][]*endpointsInfo,
svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo) {
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
for i := range endpoints.Subsets {
ss := &endpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
svcPort := proxy.ServicePortName{
NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name},
Port: port.Name,
}
for i := range ss.Addresses {
addr := &ss.Addresses[i]
hostPortObject := hostPortInfo{
host: addr.IP,
port: int(port.Port),
isLocal: addr.NodeName != nil && *addr.NodeName == hostname,
}
(*svcPortToInfoMap)[svcPort] = append((*svcPortToInfoMap)[svcPort], hostPortObject)
}
}
}
// Decompose the lists of endpoints into details of what was changed for the caller.
for svcPort, hostPortInfos := range *svcPortToInfoMap {
newEPList := flattenValidEndpoints(hostPortInfos)
glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList)
// Once the set operations using the list of ips are complete, build the list of endpoint infos
(*newEndpoints)[svcPort] = buildEndpointInfoList(hostPortInfos, newEPList)
}
}
// updateHealthCheckEntries - send the new set of local endpoints to the health checker
func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) {
func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo, healthChecker healthChecker) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
}
// Use a set instead of a slice to provide deduplication
endpoints := sets.NewString()
for _, portInfo := range hostPorts {
if portInfo.localEndpoint {
if portInfo.isLocal {
// kube-proxy health check only needs local endpoints
endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name))
}
}
healthcheck.UpdateEndpoints(name, endpoints)
healthChecker.UpdateEndpoints(name, endpoints)
}
// used in OnEndpointsUpdate
type hostPortInfo struct {
host string
port int
localEndpoint bool
host string
port int
isLocal bool
}
func isValidEndpoint(hpp *hostPortInfo) bool {
return hpp.host != "" && hpp.port > 0
}
// Tests whether two slices are equivalent. This sorts both slices in-place.
func slicesEquiv(lhs, rhs []string) bool {
if len(lhs) != len(rhs) {
return false
}
if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) {
return true
}
return false
}
func flattenValidEndpoints(endpoints []hostPortInfo) []string {
// Convert Endpoint objects into strings for easier use later.
var result []string
@ -762,11 +777,6 @@ func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endp
return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
}
// getRemovedEndpoints returns the endpoint IPs that are missing in the new endpoints
func getRemovedEndpoints(curEndpoints, newEndpoints []string) []string {
return sets.NewString(curEndpoints...).Difference(sets.NewString(newEndpoints...)).List()
}
type endpointServicePair struct {
endpoint string
servicePortName proxy.ServicePortName
@ -832,7 +842,7 @@ func (proxier *Proxier) syncProxyRules() {
glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate {
if proxier.allEndpoints == nil || !proxier.haveReceivedServiceUpdate {
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}
@ -881,6 +891,10 @@ func (proxier *Proxier) syncProxyRules() {
}
}
//
// Below this point we will not return until we try to write the iptables rules.
//
// Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
existingFilterChains := make(map[utiliptables.Chain]string)
@ -1195,7 +1209,7 @@ func (proxier *Proxier) syncProxyRules() {
endpointChains := make([]utiliptables.Chain, 0)
for _, ep := range proxier.endpointsMap[svcName] {
endpoints = append(endpoints, ep)
endpointChain := servicePortEndpointChainName(svcName, protocol, ep.ip)
endpointChain := servicePortEndpointChainName(svcName, protocol, ep.endpoint)
endpointChains = append(endpointChains, endpointChain)
// Create the endpoint chain, retaining counters if possible.
@ -1245,14 +1259,14 @@ func (proxier *Proxier) syncProxyRules() {
}
// Handle traffic that loops back to the originator with SNAT.
writeLine(natRules, append(args,
"-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].ip, ":")[0]),
"-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].endpoint, ":")[0]),
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].ip)
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].endpoint)
writeLine(natRules, args...)
}
@ -1266,7 +1280,7 @@ func (proxier *Proxier) syncProxyRules() {
localEndpoints := make([]*endpointsInfo, 0)
localEndpointChains := make([]utiliptables.Chain, 0)
for i := range endpointChains {
if endpoints[i].localEndpoint {
if endpoints[i].isLocal {
// These slices parallel each other; must be kept in sync
localEndpoints = append(localEndpoints, endpoints[i])
localEndpointChains = append(localEndpointChains, endpointChains[i])

File diff suppressed because it is too large Load Diff