mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #30569 from girishkalele/esipp_healthchecker
Automatic merge from submit-queue Load Balancer Health Check responder library for ESIPP This is an independent component that is needed for the Load Balancer health traffic steering functionality (part of the 1.4 ESIPP work)
This commit is contained in:
commit
0075144475
@ -109,6 +109,7 @@ pkg/kubelet/volumemanager/reconciler
|
||||
pkg/kubelet/volume/populator
|
||||
pkg/kubelet/volume/reconciler
|
||||
pkg/proxy/config
|
||||
pkg/proxy/healthcheck
|
||||
pkg/quota/install
|
||||
pkg/registry
|
||||
pkg/registry/authorization/util
|
||||
|
65
pkg/proxy/healthcheck/api.go
Normal file
65
pkg/proxy/healthcheck/api.go
Normal file
@ -0,0 +1,65 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
// All public API Methods for this package
|
||||
|
||||
// UpdateEndpoints Update the set of local endpoints for a service
|
||||
func UpdateEndpoints(serviceName types.NamespacedName, endpointUids sets.String) {
|
||||
req := &proxyMutationRequest{
|
||||
serviceName: serviceName,
|
||||
endpointUids: &endpointUids,
|
||||
}
|
||||
healthchecker.mutationRequestChannel <- req
|
||||
}
|
||||
|
||||
func updateServiceListener(serviceName types.NamespacedName, listenPort int, addOrDelete bool) bool {
|
||||
responseChannel := make(chan bool)
|
||||
req := &proxyListenerRequest{
|
||||
serviceName: serviceName,
|
||||
listenPort: uint16(listenPort),
|
||||
add: addOrDelete,
|
||||
responseChannel: responseChannel,
|
||||
}
|
||||
healthchecker.listenerRequestChannel <- req
|
||||
return <-responseChannel
|
||||
}
|
||||
|
||||
// AddServiceListener Request addition of a listener for a service's health check
|
||||
func AddServiceListener(serviceName types.NamespacedName, listenPort int) bool {
|
||||
return updateServiceListener(serviceName, listenPort, true)
|
||||
}
|
||||
|
||||
// DeleteServiceListener Request addition of a listener for a service's health check
|
||||
func DeleteServiceListener(serviceName types.NamespacedName, listenPort int) bool {
|
||||
return updateServiceListener(serviceName, listenPort, false)
|
||||
}
|
||||
|
||||
// Run Start the healthchecker main loop
|
||||
func Run() {
|
||||
healthchecker = proxyHealthCheckFactory()
|
||||
// Wrap with a wait.Forever to handle panics.
|
||||
go wait.Forever(func() {
|
||||
healthchecker.handlerLoop()
|
||||
}, 0)
|
||||
}
|
18
pkg/proxy/healthcheck/doc.go
Normal file
18
pkg/proxy/healthcheck/doc.go
Normal file
@ -0,0 +1,18 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies
|
||||
package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck"
|
127
pkg/proxy/healthcheck/healthcheck.go
Normal file
127
pkg/proxy/healthcheck/healthcheck.go
Normal file
@ -0,0 +1,127 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
|
||||
// proxyMutationRequest: Message to request addition/deletion of endpoints for a service
|
||||
type proxyMutationRequest struct {
|
||||
serviceName types.NamespacedName
|
||||
endpointUids *sets.String
|
||||
}
|
||||
|
||||
// proxyListenerRequest: Message to request addition/deletion of a service responder on a listening port
|
||||
type proxyListenerRequest struct {
|
||||
serviceName types.NamespacedName
|
||||
listenPort uint16
|
||||
add bool
|
||||
responseChannel chan bool
|
||||
}
|
||||
|
||||
// serviceEndpointsList: A list of endpoints for a service
|
||||
type serviceEndpointsList struct {
|
||||
serviceName types.NamespacedName
|
||||
endpoints *sets.String
|
||||
}
|
||||
|
||||
// serviceResponder: Contains net/http datastructures necessary for responding to each Service's health check on its aux nodePort
|
||||
type serviceResponder struct {
|
||||
serviceName types.NamespacedName
|
||||
listenPort uint16
|
||||
listener *net.Listener
|
||||
server *http.Server
|
||||
}
|
||||
|
||||
// proxyHC: Handler structure for health check, endpoint add/delete and service listener add/delete requests
|
||||
type proxyHC struct {
|
||||
serviceEndpointsMap cache.ThreadSafeStore
|
||||
serviceResponderMap map[types.NamespacedName]serviceResponder
|
||||
mutationRequestChannel chan *proxyMutationRequest
|
||||
listenerRequestChannel chan *proxyListenerRequest
|
||||
}
|
||||
|
||||
// handleHealthCheckRequest - received a health check request - lookup and respond to HC.
|
||||
func (h *proxyHC) handleHealthCheckRequest(rw http.ResponseWriter, serviceName string) {
|
||||
s, ok := h.serviceEndpointsMap.Get(serviceName)
|
||||
if !ok {
|
||||
glog.V(4).Infof("Service %s not found or has no local endpoints", serviceName)
|
||||
sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "No Service Endpoints Not Found")
|
||||
return
|
||||
}
|
||||
numEndpoints := len(*s.(*serviceEndpointsList).endpoints)
|
||||
if numEndpoints > 0 {
|
||||
sendHealthCheckResponse(rw, http.StatusOK, fmt.Sprintf("%d Service Endpoints found", numEndpoints))
|
||||
return
|
||||
}
|
||||
sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "0 local Endpoints are alive")
|
||||
}
|
||||
|
||||
// handleMutationRequest - receive requests to mutate the table entry for a service
|
||||
func (h *proxyHC) handleMutationRequest(req *proxyMutationRequest) {
|
||||
numEndpoints := len(*req.endpointUids)
|
||||
glog.V(4).Infof("LB service health check mutation request Service: %s - %d Endpoints %v",
|
||||
req.serviceName, numEndpoints, (*req.endpointUids).List())
|
||||
if numEndpoints == 0 {
|
||||
if _, ok := h.serviceEndpointsMap.Get(req.serviceName.String()); ok {
|
||||
glog.V(4).Infof("Deleting endpoints map for service %s, all local endpoints gone", req.serviceName.String())
|
||||
h.serviceEndpointsMap.Delete(req.serviceName.String())
|
||||
}
|
||||
return
|
||||
}
|
||||
var entry *serviceEndpointsList
|
||||
e, exists := h.serviceEndpointsMap.Get(req.serviceName.String())
|
||||
if exists {
|
||||
entry = e.(*serviceEndpointsList)
|
||||
if entry.endpoints.Equal(*req.endpointUids) {
|
||||
return
|
||||
}
|
||||
// Compute differences just for printing logs about additions and removals
|
||||
deletedEndpoints := entry.endpoints.Difference(*req.endpointUids)
|
||||
newEndpoints := req.endpointUids.Difference(*entry.endpoints)
|
||||
for _, e := range newEndpoints.List() {
|
||||
glog.V(4).Infof("Adding local endpoint %s to LB health check for service %s",
|
||||
e, req.serviceName.String())
|
||||
}
|
||||
for _, d := range deletedEndpoints.List() {
|
||||
glog.V(4).Infof("Deleted endpoint %s from service %s LB health check (%d endpoints left)",
|
||||
d, req.serviceName.String(), len(*entry.endpoints))
|
||||
}
|
||||
}
|
||||
entry = &serviceEndpointsList{serviceName: req.serviceName, endpoints: req.endpointUids}
|
||||
h.serviceEndpointsMap.Add(req.serviceName.String(), entry)
|
||||
}
|
||||
|
||||
// proxyHealthCheckRequest - Factory method to instantiate the health check handler
|
||||
func proxyHealthCheckFactory() *proxyHC {
|
||||
glog.V(2).Infof("Initializing kube-proxy health checker")
|
||||
phc := &proxyHC{
|
||||
serviceEndpointsMap: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
|
||||
serviceResponderMap: make(map[types.NamespacedName]serviceResponder),
|
||||
mutationRequestChannel: make(chan *proxyMutationRequest, 1024),
|
||||
listenerRequestChannel: make(chan *proxyListenerRequest, 1024),
|
||||
}
|
||||
return phc
|
||||
}
|
158
pkg/proxy/healthcheck/healthcheck_test.go
Normal file
158
pkg/proxy/healthcheck/healthcheck_test.go
Normal file
@ -0,0 +1,158 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
|
||||
type TestCaseData struct {
|
||||
nodePorts int
|
||||
numEndpoints int
|
||||
nodePortList []int
|
||||
svcNames []types.NamespacedName
|
||||
}
|
||||
|
||||
const (
|
||||
startPort = 20000
|
||||
endPort = 40000
|
||||
)
|
||||
|
||||
var (
|
||||
choices = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
|
||||
)
|
||||
|
||||
func generateRandomString(n int) string {
|
||||
|
||||
b := make([]byte, n)
|
||||
l := len(choices)
|
||||
for i := range b {
|
||||
b[i] = choices[rand.Intn(l)]
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func chooseServiceName(tc int, hint int) types.NamespacedName {
|
||||
var svc types.NamespacedName
|
||||
svc.Namespace = fmt.Sprintf("ns_%d", tc)
|
||||
svc.Name = fmt.Sprintf("name_%d", hint)
|
||||
return svc
|
||||
}
|
||||
|
||||
func generateEndpointSet(max int) sets.String {
|
||||
s := sets.NewString()
|
||||
for i := 0; i < max; i++ {
|
||||
s.Insert(fmt.Sprintf("%d%s", i, generateRandomString(8)))
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func verifyHealthChecks(tc *TestCaseData, t *testing.T) bool {
|
||||
var success = true
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
for i := 0; i < tc.nodePorts; i++ {
|
||||
t.Logf("Validating HealthCheck works for svc %s nodePort %d\n", tc.svcNames[i], tc.nodePortList[i])
|
||||
res, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", tc.nodePortList[i]))
|
||||
if err != nil {
|
||||
t.Logf("ERROR: Failed to connect to listening port")
|
||||
success = false
|
||||
continue
|
||||
}
|
||||
robots, err := ioutil.ReadAll(res.Body)
|
||||
if res.StatusCode == http.StatusServiceUnavailable {
|
||||
t.Logf("ERROR: HealthCheck returned %s: %s", res.Status, string(robots))
|
||||
success = false
|
||||
continue
|
||||
}
|
||||
res.Body.Close()
|
||||
if err != nil {
|
||||
t.Logf("Error: reading body of response (%s)", err)
|
||||
success = false
|
||||
continue
|
||||
}
|
||||
}
|
||||
if success {
|
||||
t.Logf("Success: All nodePorts found active")
|
||||
}
|
||||
return success
|
||||
}
|
||||
|
||||
func TestHealthChecker(t *testing.T) {
|
||||
testcases := []TestCaseData{
|
||||
{
|
||||
nodePorts: 1,
|
||||
numEndpoints: 2,
|
||||
},
|
||||
{
|
||||
nodePorts: 10,
|
||||
numEndpoints: 6,
|
||||
},
|
||||
{
|
||||
nodePorts: 100,
|
||||
numEndpoints: 1,
|
||||
},
|
||||
}
|
||||
|
||||
Run()
|
||||
|
||||
ports := startPort
|
||||
for n, tc := range testcases {
|
||||
tc.nodePortList = make([]int, tc.nodePorts)
|
||||
tc.svcNames = make([]types.NamespacedName, tc.nodePorts)
|
||||
for i := 0; i < tc.nodePorts; i++ {
|
||||
tc.svcNames[i] = chooseServiceName(n, i)
|
||||
t.Logf("Updating endpoints map for %s %d", tc.svcNames[i], tc.numEndpoints)
|
||||
for {
|
||||
UpdateEndpoints(tc.svcNames[i], generateEndpointSet(tc.numEndpoints))
|
||||
tc.nodePortList[i] = ports
|
||||
ports++
|
||||
if AddServiceListener(tc.svcNames[i], tc.nodePortList[i]) {
|
||||
break
|
||||
}
|
||||
DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i])
|
||||
// Keep searching for a port that works
|
||||
t.Logf("Failed to bind/listen on port %d...trying next port", ports-1)
|
||||
if ports > endPort {
|
||||
t.Errorf("Exhausted range of ports available for tests")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
t.Logf("Validating if all nodePorts for tc %d work", n)
|
||||
if !verifyHealthChecks(&tc, t) {
|
||||
t.Errorf("Healthcheck validation failed")
|
||||
}
|
||||
|
||||
for i := 0; i < tc.nodePorts; i++ {
|
||||
DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i])
|
||||
UpdateEndpoints(tc.svcNames[i], sets.NewString())
|
||||
}
|
||||
|
||||
// Ensure that all listeners have been shutdown
|
||||
if verifyHealthChecks(&tc, t) {
|
||||
t.Errorf("Healthcheck validation failed")
|
||||
}
|
||||
}
|
||||
}
|
46
pkg/proxy/healthcheck/http.go
Normal file
46
pkg/proxy/healthcheck/http.go
Normal file
@ -0,0 +1,46 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// A healthCheckHandler serves http requests on /healthz on the service health check node port,
|
||||
// and responds to every request with either:
|
||||
// 200 OK and the count of endpoints for the given service that are local to this node.
|
||||
// or
|
||||
// 503 Service Unavailable If the count is zero or the service does not exist
|
||||
type healthCheckHandler struct {
|
||||
svcNsName string
|
||||
}
|
||||
|
||||
// HTTP Utility function to send the required statusCode and error text to a http.ResponseWriter object
|
||||
func sendHealthCheckResponse(rw http.ResponseWriter, statusCode int, error string) {
|
||||
rw.Header().Set("Content-Type", "text/plain")
|
||||
rw.WriteHeader(statusCode)
|
||||
fmt.Fprint(rw, error)
|
||||
}
|
||||
|
||||
// ServeHTTP: Interface callback method for net.Listener Handlers
|
||||
func (h healthCheckHandler) ServeHTTP(response http.ResponseWriter, req *http.Request) {
|
||||
glog.V(4).Infof("Received HC Request Service %s from Cloud Load Balancer", h.svcNsName)
|
||||
healthchecker.handleHealthCheckRequest(response, h.svcNsName)
|
||||
}
|
77
pkg/proxy/healthcheck/listener.go
Normal file
77
pkg/proxy/healthcheck/listener.go
Normal file
@ -0,0 +1,77 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthcheck
|
||||
|
||||
// Create/Delete dynamic listeners on the required nodePorts
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// handleServiceListenerRequest: receive requests to add/remove service health check listening ports
|
||||
func (h *proxyHC) handleServiceListenerRequest(req *proxyListenerRequest) bool {
|
||||
sr, serviceFound := h.serviceResponderMap[req.serviceName]
|
||||
if !req.add {
|
||||
if !serviceFound {
|
||||
return false
|
||||
}
|
||||
glog.Infof("Deleting HealthCheckListenPort for service %s port %d",
|
||||
req.serviceName, req.listenPort)
|
||||
delete(h.serviceResponderMap, req.serviceName)
|
||||
(*sr.listener).Close()
|
||||
return true
|
||||
} else if serviceFound {
|
||||
if req.listenPort == sr.listenPort {
|
||||
// Addition requested but responder for service already exists and port is unchanged
|
||||
return true
|
||||
}
|
||||
// Addition requested but responder for service already exists but the listen port has changed
|
||||
glog.Infof("HealthCheckListenPort for service %s changed from %d to %d - closing old listening port",
|
||||
req.serviceName, sr.listenPort, req.listenPort)
|
||||
delete(h.serviceResponderMap, req.serviceName)
|
||||
(*sr.listener).Close()
|
||||
}
|
||||
// Create a service responder object and start listening and serving on the provided port
|
||||
glog.V(2).Infof("Adding health check listener for service %s on nodePort %d", req.serviceName, req.listenPort)
|
||||
server := http.Server{
|
||||
Addr: fmt.Sprintf(":%d", req.listenPort),
|
||||
Handler: healthCheckHandler{svcNsName: req.serviceName.String()},
|
||||
}
|
||||
listener, err := net.Listen("tcp", server.Addr)
|
||||
if err != nil {
|
||||
glog.Warningf("FAILED to listen on address %s (%s)\n", server.Addr, err)
|
||||
return false
|
||||
}
|
||||
h.serviceResponderMap[req.serviceName] = serviceResponder{serviceName: req.serviceName,
|
||||
listenPort: req.listenPort,
|
||||
listener: &listener,
|
||||
server: &server}
|
||||
go func() {
|
||||
// Anonymous goroutine to block on Serve for this listen port - Serve will exit when the listener is closed
|
||||
glog.V(3).Infof("Goroutine blocking on serving health checks for %s on port %d", req.serviceName, req.listenPort)
|
||||
if err := server.Serve(listener); err != nil {
|
||||
glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed with error %s\n", req.listenPort, req.serviceName, err)
|
||||
return
|
||||
}
|
||||
glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed\n", req.listenPort, req.serviceName)
|
||||
}()
|
||||
return true
|
||||
}
|
53
pkg/proxy/healthcheck/worker.go
Normal file
53
pkg/proxy/healthcheck/worker.go
Normal file
@ -0,0 +1,53 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies
|
||||
package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
var healthchecker *proxyHC
|
||||
|
||||
// handlerLoop Serializes all requests to prevent concurrent access to the maps
|
||||
func (h *proxyHC) handlerLoop() {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case req := <-h.mutationRequestChannel:
|
||||
h.handleMutationRequest(req)
|
||||
case req := <-h.listenerRequestChannel:
|
||||
req.responseChannel <- h.handleServiceListenerRequest(req)
|
||||
case <-ticker.C:
|
||||
go h.sync()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *proxyHC) sync() {
|
||||
glog.V(4).Infof("%d Health Check Listeners", len(h.serviceResponderMap))
|
||||
glog.V(4).Infof("%d Services registered for health checking", len(h.serviceEndpointsMap.List()))
|
||||
for _, svc := range h.serviceEndpointsMap.ListKeys() {
|
||||
if e, ok := h.serviceEndpointsMap.Get(svc); ok {
|
||||
endpointList := e.(*serviceEndpointsList)
|
||||
glog.V(4).Infof("Service %s has %d local endpoints", svc, endpointList.endpoints.Len())
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user