Merge pull request #44053 from thockin/proxy-healthchecks

Automatic merge from submit-queue (batch tested with PRs 43871, 44053)

Proxy healthchecks overhaul

The first commit is #44051 

These three commits are tightly coupled, but should be reviewed one-by-one.  The first adds tests for healthchecks, and found a bug.  The second basically rewrites the healthcheck pkg to be much simpler and less flexible (since we weren't using the flexibility).  The third tweaks how healthchecks are handled in endpoints-path to be more like they are in services-path.

@MrHohn because I know you were in here for source-IP GA work.

@wojtek-t
This commit is contained in:
Kubernetes Submit Queue
2017-04-06 12:36:26 -07:00
committed by GitHub
11 changed files with 690 additions and 670 deletions

View File

@@ -219,7 +219,7 @@ type Proxier struct {
nodeIP net.IP
portMapper portOpener
recorder record.EventRecorder
healthChecker healthChecker
healthChecker healthcheck.Server
}
type localPort struct {
@@ -251,17 +251,6 @@ func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) {
return openLocalPort(lp)
}
type healthChecker interface {
UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String)
}
// TODO: the healthcheck pkg should offer a type
type globalHealthChecker struct{}
func (globalHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {
healthcheck.UpdateEndpoints(serviceName, endpointUIDs)
}
// Proxier implements ProxyProvider
var _ proxy.ProxyProvider = &Proxier{}
@@ -315,8 +304,7 @@ func NewProxier(ipt utiliptables.Interface,
glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
}
healthChecker := globalHealthChecker{}
go healthcheck.Run()
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
var throttle flowcontrol.RateLimiter
// Defaulting back to not limit sync rate when minSyncPeriod is 0.
@@ -451,18 +439,12 @@ func (proxier *Proxier) SyncLoop() {
}
}
type healthCheckPort struct {
namespace types.NamespacedName
nodeport int
}
// Accepts a list of Services and the existing service map. Returns the new
// service map, a list of healthcheck ports to add to or remove from the health
// checking listener service, and a set of stale UDP services.
func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) {
// service map, a map of healthcheck ports, and a set of stale UDP
// services.
func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) {
newServiceMap := make(proxyServiceMap)
healthCheckAdd := make([]healthCheckPort, 0)
healthCheckDel := make([]healthCheckPort, 0)
hcPorts := make(map[types.NamespacedName]uint16)
for _, service := range allServices {
svcName := types.NamespacedName{
@@ -498,12 +480,8 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
}
if !exists || !equal {
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort})
} else {
healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0})
}
if info.onlyNodeLocalEndpoints {
hcPorts[svcName] = uint16(info.healthCheckNodePort)
}
newServiceMap[serviceName] = info
@@ -511,6 +489,13 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa
}
}
for nsn, port := range hcPorts {
if port == 0 {
glog.Errorf("Service %q has no healthcheck nodeport", nsn)
delete(hcPorts, nsn)
}
}
staleUDPServices := sets.NewString()
// Remove serviceports missing from the update.
for name, info := range oldServiceMap {
@@ -519,13 +504,10 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa
if info.protocol == api.ProtocolUDP {
staleUDPServices.Insert(info.clusterIP.String())
}
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort})
}
}
}
return newServiceMap, healthCheckAdd, healthCheckDel, staleUDPServices
return newServiceMap, hcPorts, staleUDPServices
}
// OnServiceUpdate tracks the active set of service proxies.
@@ -538,19 +520,11 @@ func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) {
}
proxier.allServices = allServices
newServiceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(allServices, proxier.serviceMap)
for _, hc := range hcAdd {
glog.V(4).Infof("Adding health check for %+v, port %v", hc.namespace, hc.nodeport)
// Turn on healthcheck responder to listen on the health check nodePort
// FIXME: handle failures from adding the service
healthcheck.AddServiceListener(hc.namespace, hc.nodeport)
}
for _, hc := range hcDel {
// Remove ServiceListener health check nodePorts from the health checker
// TODO - Stats
glog.V(4).Infof("Deleting health check for %+v, port %v", hc.namespace, hc.nodeport)
// FIXME: handle failures from deleting the service
healthcheck.DeleteServiceListener(hc.namespace, hc.nodeport)
newServiceMap, hcPorts, staleUDPServices := buildNewServiceMap(allServices, proxier.serviceMap)
// update healthcheck ports
if err := proxier.healthChecker.SyncServices(hcPorts); err != nil {
glog.Errorf("Error syncing healtcheck ports: %v", err)
}
if len(newServiceMap) != len(proxier.serviceMap) || !reflect.DeepEqual(newServiceMap, proxier.serviceMap) {
@@ -573,7 +547,13 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
proxier.allEndpoints = allEndpoints
// TODO: once service has made this same transform, move this into proxier.syncProxyRules()
newMap, staleConnections := buildNewEndpointsMap(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname, proxier.healthChecker)
newMap, hcEndpoints, staleConnections := buildNewEndpointsMap(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname)
// update healthcheck endpoints
if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil {
glog.Errorf("Error syncing healthcheck endoints: %v", err)
}
if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) {
proxier.endpointsMap = newMap
proxier.syncProxyRules(syncReasonEndpoints)
@@ -585,11 +565,11 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
}
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string,
healthChecker healthChecker) (newMap proxyEndpointMap, staleSet map[endpointServicePair]bool) {
func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string) (newMap proxyEndpointMap, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
// return values
newMap = make(proxyEndpointMap)
hcEndpoints = make(map[types.NamespacedName]int)
staleSet = make(map[endpointServicePair]bool)
// Update endpoints for services.
@@ -615,19 +595,30 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap
}
}
// Update service health check
allSvcPorts := make(map[proxy.ServicePortName]bool)
for svcPort := range curMap {
allSvcPorts[svcPort] = true
}
for svcPort := range newMap {
allSvcPorts[svcPort] = true
}
for svcPort := range allSvcPorts {
updateHealthCheckEntries(svcPort.NamespacedName, newMap[svcPort], healthChecker)
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
}
return newMap, staleSet
// accumulate local IPs per service, ignoring ports
localIPs := map[types.NamespacedName]sets.String{}
for svcPort := range newMap {
for _, ep := range newMap[svcPort] {
if ep.isLocal {
nsn := svcPort.NamespacedName
if localIPs[nsn] == nil {
localIPs[nsn] = sets.NewString()
}
ip := strings.Split(ep.endpoint, ":")[0] // just the IP part
localIPs[nsn].Insert(ip)
}
}
}
// produce a count per service
for nsn, ips := range localIPs {
hcEndpoints[nsn] = len(ips)
}
return newMap, hcEndpoints, staleSet
}
// Gather information about all the endpoint state for a given api.Endpoints.
@@ -675,23 +666,6 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
}
}
// updateHealthCheckEntries - send the new set of local endpoints to the health checker
func updateHealthCheckEntries(name types.NamespacedName, endpoints []*endpointsInfo, healthChecker healthChecker) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
}
// Use a set instead of a slice to provide deduplication
epSet := sets.NewString()
for _, portInfo := range endpoints {
if portInfo.isLocal {
// kube-proxy health check only needs local endpoints
epSet.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name))
}
}
healthChecker.UpdateEndpoints(name, epSet)
}
// portProtoHash takes the ServicePortName and protocol for a service
// returns the associated 16 character hash. This is computed by hashing (sha256)
// then encoding to base32 and truncating to 16 chars. We do this because IPTables