diff --git a/pkg/proxy/healthcheck/healthcheck.go b/pkg/proxy/healthcheck/healthcheck.go index 26e8b721ac1..9e6cf7028a6 100644 --- a/pkg/proxy/healthcheck/healthcheck.go +++ b/pkg/proxy/healthcheck/healthcheck.go @@ -22,6 +22,8 @@ import ( "net/http" "strings" "sync" + "sync/atomic" + "time" "github.com/golang/glog" "github.com/renstrom/dedent" @@ -29,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/clock" "k8s.io/kubernetes/pkg/api" ) @@ -233,3 +236,92 @@ func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) erro } return nil } + +// HealthzUpdater allows callers to update healthz timestamp only. +type HealthzUpdater interface { + UpdateTimestamp() +} + +// HealthzServer returns 200 "OK" by default. Once timestamp has been +// updated, it verifies we don't exceed max no respond duration since +// last update. +type HealthzServer struct { + listener Listener + httpFactory HTTPServerFactory + clock clock.Clock + + addr string + port int32 + healthTimeout time.Duration + + lastUpdated atomic.Value +} + +// NewDefaultHealthzServer returns a default healthz http server. +func NewDefaultHealthzServer(addr string, healthTimeout time.Duration) *HealthzServer { + return newHealthzServer(nil, nil, nil, addr, healthTimeout) +} + +func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *HealthzServer { + if listener == nil { + listener = stdNetListener{} + } + if httpServerFactory == nil { + httpServerFactory = stdHTTPServerFactory{} + } + if c == nil { + c = clock.RealClock{} + } + return &HealthzServer{ + listener: listener, + httpFactory: httpServerFactory, + clock: c, + addr: addr, + healthTimeout: healthTimeout, + } +} + +// UpdateTimestamp updates the lastUpdated timestamp. +func (hs *HealthzServer) UpdateTimestamp() { + hs.lastUpdated.Store(hs.clock.Now()) +} + +// Run starts the healthz http server and returns. +func (hs *HealthzServer) Run() { + serveMux := http.NewServeMux() + serveMux.Handle("/healthz", healthzHandler{hs: hs}) + server := hs.httpFactory.New(hs.addr, serveMux) + listener, err := hs.listener.Listen(hs.addr) + if err != nil { + glog.Errorf("Failed to start healthz on %s: %v", hs.addr, err) + return + } + go func() { + glog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr) + if err := server.Serve(listener); err != nil { + glog.Errorf("Healhz closed: %v", err) + return + } + glog.Errorf("Unexpected healhz closed.") + }() +} + +type healthzHandler struct { + hs *HealthzServer +} + +func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + lastUpdated := time.Time{} + if val := h.hs.lastUpdated.Load(); val != nil { + lastUpdated = val.(time.Time) + } + currentTime := h.hs.clock.Now() + + resp.Header().Set("Content-Type", "application/json") + if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) { + resp.WriteHeader(http.StatusServiceUnavailable) + } else { + resp.WriteHeader(http.StatusOK) + } + fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime)) +} diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index a90a8a386ec..c39652f8ec2 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -22,11 +22,13 @@ import ( "net/http" "net/http/httptest" "testing" - - "github.com/davecgh/go-spew/spew" + "time" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/clock" + + "github.com/davecgh/go-spew/spew" ) type fakeListener struct { @@ -108,6 +110,11 @@ type hcPayload struct { LocalEndpoints int } +type healthzPayload struct { + LastUpdated string + CurrentTime string +} + func TestServer(t *testing.T) { listener := newFakeListener() httpFactory := newFakeHTTPServerFactory() @@ -355,3 +362,44 @@ func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints in t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints) } } + +func TestHealthzServer(t *testing.T) { + listener := newFakeListener() + httpFactory := newFakeHTTPServerFactory() + fakeClock := clock.NewFakeClock(time.Now()) + + hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second) + server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs}) + + // Should return 200 "OK" by default. + testHealthzHandler(server, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if exceed max no respond duration. + hs.UpdateTimestamp() + fakeClock.Step(25 * time.Second) + testHealthzHandler(server, http.StatusServiceUnavailable, t) + + // Should return 200 "OK" if timestamp is valid. + hs.UpdateTimestamp() + fakeClock.Step(5 * time.Second) + testHealthzHandler(server, http.StatusOK, t) +} + +func testHealthzHandler(server HTTPServer, status int, t *testing.T) { + handler := server.(*fakeHTTPServer).handler + req, err := http.NewRequest("GET", "/healthz", nil) + if err != nil { + t.Fatal(err) + } + resp := httptest.NewRecorder() + + handler.ServeHTTP(resp, req) + + if resp.Code != status { + t.Errorf("expected status code %v, got %v", status, resp.Code) + } + var payload healthzPayload + if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil { + t.Fatal(err) + } +}