mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Merge pull request #42747 from dcbw/iptables-proxy-optimize
Automatic merge from submit-queue (batch tested with PRs 42747, 43030) kube-proxy/iptables: optimize endpoint map creation by excluding invalid endpoints earlier We don't need to do as much work as we were doing, if we exclude invalid endpoints earlier in the endpoints processing. Fixes: https://github.com/kubernetes/kubernetes/issues/42210 @freehan @liggitt @thockin if you could review this with a fine-toothed comb... I can't immediately think of why invalid endpoints would be useful for the HealthChecker, and this PR prevents the HC from seeing these endpoints.
This commit is contained in:
commit
1b4433d4c3
@ -188,12 +188,14 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
|
||||
|
||||
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
|
||||
|
||||
type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo
|
||||
|
||||
// Proxier is an iptables based proxy for connections between a localhost:lport
|
||||
// and services that provide the actual backends.
|
||||
type Proxier struct {
|
||||
mu sync.Mutex // protects the following fields
|
||||
serviceMap proxyServiceMap
|
||||
endpointsMap map[proxy.ServicePortName][]*endpointsInfo
|
||||
endpointsMap proxyEndpointMap
|
||||
portsMap map[localPort]closeable
|
||||
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
|
||||
allEndpoints []api.Endpoints // nil until we have seen an OnEndpointsUpdate event
|
||||
@ -320,7 +322,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
|
||||
return &Proxier{
|
||||
serviceMap: make(proxyServiceMap),
|
||||
endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo),
|
||||
endpointsMap: make(proxyEndpointMap),
|
||||
portsMap: make(map[localPort]closeable),
|
||||
syncPeriod: syncPeriod,
|
||||
minSyncPeriod: minSyncPeriod,
|
||||
@ -556,32 +558,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
||||
utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List())
|
||||
}
|
||||
|
||||
// Reconstruct the list of endpoint infos from the endpointIP list
|
||||
// Use the slice of endpointIPs to rebuild a slice of corresponding {endpointIP, localEndpointOnly} infos
|
||||
// from the full []hostPortInfo slice.
|
||||
//
|
||||
// For e.g. if input is
|
||||
// endpoints = []hostPortInfo{ {host="1.1.1.1", port=22, localEndpointOnly=<bool>}, {host="2.2.2.2", port=80, localEndpointOnly=<bool>} }
|
||||
// endpointIPs = []string{ "2.2.2.2:80" }
|
||||
//
|
||||
// then output will be
|
||||
//
|
||||
// []endpointsInfo{ {"2.2.2.2:80", localEndpointOnly=<bool>} }
|
||||
func buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*endpointsInfo {
|
||||
lookupSet := sets.NewString()
|
||||
for _, ip := range endpointIPs {
|
||||
lookupSet.Insert(ip)
|
||||
}
|
||||
var filteredEndpoints []*endpointsInfo
|
||||
for _, hpp := range endPoints {
|
||||
key := net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))
|
||||
if lookupSet.Has(key) {
|
||||
filteredEndpoints = append(filteredEndpoints, &endpointsInfo{endpoint: key, isLocal: hpp.isLocal})
|
||||
}
|
||||
}
|
||||
return filteredEndpoints
|
||||
}
|
||||
|
||||
// OnEndpointsUpdate takes in a slice of updated endpoints.
|
||||
func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
||||
proxier.mu.Lock()
|
||||
@ -604,19 +580,16 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
||||
}
|
||||
|
||||
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
|
||||
func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string,
|
||||
healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, staleSet map[endpointServicePair]bool) {
|
||||
func updateEndpoints(allEndpoints []api.Endpoints, curMap proxyEndpointMap, hostname string,
|
||||
healthChecker healthChecker) (newMap proxyEndpointMap, staleSet map[endpointServicePair]bool) {
|
||||
|
||||
// return values
|
||||
newMap = make(map[proxy.ServicePortName][]*endpointsInfo)
|
||||
newMap = make(proxyEndpointMap)
|
||||
staleSet = make(map[endpointServicePair]bool)
|
||||
|
||||
// local
|
||||
svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo)
|
||||
|
||||
// Update endpoints for services.
|
||||
for i := range allEndpoints {
|
||||
accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap)
|
||||
accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap)
|
||||
}
|
||||
// Check stale connections against endpoints missing from the update.
|
||||
// TODO: we should really only mark a connection stale if the proto was UDP
|
||||
@ -646,7 +619,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN
|
||||
allSvcPorts[svcPort] = true
|
||||
}
|
||||
for svcPort := range allSvcPorts {
|
||||
updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort], healthChecker)
|
||||
updateHealthCheckEntries(svcPort.NamespacedName, newMap[svcPort], healthChecker)
|
||||
}
|
||||
|
||||
return newMap, staleSet
|
||||
@ -662,9 +635,8 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN
|
||||
// - the test for this is overlapped by the test for updateEndpoints
|
||||
// - naming is poor and responsibilities are muddled
|
||||
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
|
||||
curEndpoints map[proxy.ServicePortName][]*endpointsInfo,
|
||||
newEndpoints *map[proxy.ServicePortName][]*endpointsInfo,
|
||||
svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo) {
|
||||
curEndpoints proxyEndpointMap,
|
||||
newEndpoints *proxyEndpointMap) {
|
||||
|
||||
// We need to build a map of portname -> all ip:ports for that
|
||||
// portname. Explode Endpoints.Subsets[*] into this structure.
|
||||
@ -672,70 +644,45 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
|
||||
ss := &endpoints.Subsets[i]
|
||||
for i := range ss.Ports {
|
||||
port := &ss.Ports[i]
|
||||
if port.Port == 0 {
|
||||
glog.Warningf("ignoring invalid endpoint port %s", port.Name)
|
||||
continue
|
||||
}
|
||||
svcPort := proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name},
|
||||
Port: port.Name,
|
||||
}
|
||||
for i := range ss.Addresses {
|
||||
addr := &ss.Addresses[i]
|
||||
hostPortObject := hostPortInfo{
|
||||
host: addr.IP,
|
||||
port: int(port.Port),
|
||||
isLocal: addr.NodeName != nil && *addr.NodeName == hostname,
|
||||
if addr.IP == "" {
|
||||
glog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name)
|
||||
continue
|
||||
}
|
||||
(*svcPortToInfoMap)[svcPort] = append((*svcPortToInfoMap)[svcPort], hostPortObject)
|
||||
epInfo := &endpointsInfo{
|
||||
endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))),
|
||||
isLocal: addr.NodeName != nil && *addr.NodeName == hostname,
|
||||
}
|
||||
(*newEndpoints)[svcPort] = append((*newEndpoints)[svcPort], epInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Decompose the lists of endpoints into details of what was changed for the caller.
|
||||
for svcPort, hostPortInfos := range *svcPortToInfoMap {
|
||||
newEPList := flattenValidEndpoints(hostPortInfos)
|
||||
glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList)
|
||||
// Once the set operations using the list of ips are complete, build the list of endpoint infos
|
||||
(*newEndpoints)[svcPort] = buildEndpointInfoList(hostPortInfos, newEPList)
|
||||
}
|
||||
}
|
||||
|
||||
// updateHealthCheckEntries - send the new set of local endpoints to the health checker
|
||||
func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo, healthChecker healthChecker) {
|
||||
func updateHealthCheckEntries(name types.NamespacedName, endpoints []*endpointsInfo, healthChecker healthChecker) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
|
||||
return
|
||||
}
|
||||
|
||||
// Use a set instead of a slice to provide deduplication
|
||||
endpoints := sets.NewString()
|
||||
for _, portInfo := range hostPorts {
|
||||
epSet := sets.NewString()
|
||||
for _, portInfo := range endpoints {
|
||||
if portInfo.isLocal {
|
||||
// kube-proxy health check only needs local endpoints
|
||||
endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name))
|
||||
epSet.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name))
|
||||
}
|
||||
}
|
||||
healthChecker.UpdateEndpoints(name, endpoints)
|
||||
}
|
||||
|
||||
// used in OnEndpointsUpdate
|
||||
type hostPortInfo struct {
|
||||
host string
|
||||
port int
|
||||
isLocal bool
|
||||
}
|
||||
|
||||
func isValidEndpoint(hpp *hostPortInfo) bool {
|
||||
return hpp.host != "" && hpp.port > 0
|
||||
}
|
||||
|
||||
func flattenValidEndpoints(endpoints []hostPortInfo) []string {
|
||||
// Convert Endpoint objects into strings for easier use later.
|
||||
var result []string
|
||||
for i := range endpoints {
|
||||
hpp := &endpoints[i]
|
||||
if isValidEndpoint(hpp) {
|
||||
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
|
||||
} else {
|
||||
glog.Warningf("got invalid endpoint: %+v", *hpp)
|
||||
}
|
||||
}
|
||||
return result
|
||||
healthChecker.UpdateEndpoints(name, epSet)
|
||||
}
|
||||
|
||||
// portProtoHash takes the ServicePortName and protocol for a service
|
||||
|
@ -1336,9 +1336,8 @@ func Test_accumulateEndpointsMap(t *testing.T) {
|
||||
|
||||
for tci, tc := range testCases {
|
||||
// outputs
|
||||
newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{}
|
||||
svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{}
|
||||
accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints, &svcPortToInfoMap)
|
||||
newEndpoints := make(proxyEndpointMap)
|
||||
accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints)
|
||||
|
||||
if len(newEndpoints) != len(tc.expectedNew) {
|
||||
t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints))
|
||||
|
Loading…
Reference in New Issue
Block a user