diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index a3d59d50120..7cb395b5089 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -131,9 +131,9 @@ func (g *APIGroupVersion) InstallREST(container *restful.Container) error { // TODO: document all handlers // InstallSupport registers the APIServer support functions -func InstallSupport(mux Mux, ws *restful.WebService, enableResettingMetrics bool) { +func InstallSupport(mux Mux, ws *restful.WebService, enableResettingMetrics bool, checks ...healthz.HealthzChecker) { // TODO: convert healthz and metrics to restful and remove container arg - healthz.InstallHandler(mux) + healthz.InstallHandler(mux, checks...) mux.Handle("/metrics", prometheus.Handler()) if enableResettingMetrics { mux.HandleFunc("/resetMetrics", metrics.Reset) diff --git a/pkg/master/master.go b/pkg/master/master.go index 1bd036994e3..cb01bca77e4 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -30,6 +30,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/admission" @@ -44,6 +45,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/handlers" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/componentstatus" @@ -77,6 +79,7 @@ import ( "github.com/emicklei/go-restful" "github.com/emicklei/go-restful/swagger" "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" ) const ( @@ -215,10 +218,13 @@ type Master struct { InsecureHandler http.Handler // Used for secure proxy - dialer apiserver.ProxyDialerFunc - tunnels *util.SSHTunnelList - tunnelsLock sync.Mutex - installSSHKey InstallSSHKey + dialer apiserver.ProxyDialerFunc + tunnels *util.SSHTunnelList + tunnelsLock sync.Mutex + installSSHKey InstallSSHKey + lastSync int64 // Seconds since Epoch + lastSyncMetric prometheus.GaugeFunc + clock util.Clock } // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version @@ -417,6 +423,8 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter) // init initializes master. func (m *Master) init(c *Config) { + healthzChecks := []healthz.HealthzChecker{} + m.clock = util.RealClock{} podStorage := podetcd.NewStorage(c.EtcdHelper, c.KubeletClient) podRegistry := pod.NewRegistry(podStorage.Pod) @@ -526,6 +534,7 @@ func (m *Master) init(c *Config) { m.tunnels = &util.SSHTunnelList{} m.dialer = m.Dial m.setupSecureProxy(c.SSHUser, c.SSHKeyfile, publicKeyFile) + m.lastSync = m.clock.Now().Unix() // This is pretty ugly. A better solution would be to pull this all the way up into the // server.go file. @@ -541,6 +550,11 @@ func (m *Master) init(c *Config) { } else { glog.Errorf("Failed to cast %v to HTTPKubeletClient, skipping SSH tunnel.") } + healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", m.IsTunnelSyncHealthy)) + m.lastSyncMetric = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "apiserver_proxy_tunnel_sync_latency_secs", + Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.", + }, func() float64 { return float64(m.secondsSinceSync()) }) } apiVersions := []string{} @@ -557,7 +571,7 @@ func (m *Master) init(c *Config) { apiVersions = append(apiVersions, "v1") } - apiserver.InstallSupport(m.muxHelper, m.rootWebService, c.EnableProfiling) + apiserver.InstallSupport(m.muxHelper, m.rootWebService, c.EnableProfiling, healthzChecks...) apiserver.AddApiWebService(m.handlerContainer, c.APIPrefix, apiVersions) defaultVersion := m.defaultAPIGroupVersion() requestInfoResolver := &apiserver.APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(defaultVersion.Root, "/")), defaultVersion.Mapper} @@ -838,6 +852,20 @@ func (m *Master) getNodeAddresses() ([]string, error) { return addrs, nil } +func (m *Master) IsTunnelSyncHealthy(req *http.Request) error { + lag := m.secondsSinceSync() + if lag > 600 { + return fmt.Errorf("Tunnel sync is taking to long: %d", lag) + } + return nil +} + +func (m *Master) secondsSinceSync() int64 { + now := m.clock.Now().Unix() + then := atomic.LoadInt64(&m.lastSync) + return now - then +} + func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error { glog.Infof("replacing tunnels. New addrs: %v", newAddrs) tunnels := util.MakeSSHTunnels(user, keyfile, newAddrs) @@ -850,6 +878,7 @@ func (m *Master) replaceTunnels(user, keyfile string, newAddrs []string) error { m.tunnels.Close() } m.tunnels = tunnels + atomic.StoreInt64(&m.lastSync, m.clock.Now().Unix()) return nil }