diff --git a/cmd/kube-proxy/BUILD b/cmd/kube-proxy/BUILD index 27170c26a7b..4cfdfcf6f8a 100644 --- a/cmd/kube-proxy/BUILD +++ b/cmd/kube-proxy/BUILD @@ -31,7 +31,6 @@ go_library( "//pkg/version/prometheus:go_default_library", "//pkg/version/verflag:go_default_library", "//vendor/github.com/spf13/pflag:go_default_library", - "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library", ], diff --git a/cmd/kube-proxy/app/BUILD b/cmd/kube-proxy/app/BUILD index b7872bd8010..2b613b79d12 100644 --- a/cmd/kube-proxy/app/BUILD +++ b/cmd/kube-proxy/app/BUILD @@ -25,6 +25,7 @@ go_library( "//pkg/kubelet/qos:go_default_library", "//pkg/proxy:go_default_library", "//pkg/proxy/config:go_default_library", + "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/iptables:go_default_library", "//pkg/proxy/userspace:go_default_library", "//pkg/proxy/winuserspace:go_default_library", diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index e42e31341e3..7f308783604 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -51,6 +51,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/proxy" proxyconfig "k8s.io/kubernetes/pkg/proxy/config" + "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/proxy/winuserspace" @@ -249,7 +250,7 @@ func applyDefaults(in *componentconfig.KubeProxyConfiguration) (*componentconfig func NewProxyCommand() *cobra.Command { opts := Options{ config: new(componentconfig.KubeProxyConfiguration), - healthzPort: 10249, + healthzPort: 10256, } cmd := &cobra.Command{ @@ -296,7 +297,7 @@ type ProxyServer struct { ProxyMode string NodeRef *clientv1.ObjectReference CleanupAndExit bool - HealthzBindAddress string + MetricsBindAddress string OOMScoreAdj *int32 ResourceContainer string ConfigSyncPeriod time.Duration @@ -305,6 +306,7 @@ type ProxyServer struct { // get rid of this one. ServiceHandler proxyconfig.ServiceConfigHandler EndpointsEventHandler proxyconfig.EndpointsHandler + HealthzServer *healthcheck.HealthzServer } // createClients creates a kube client and an event client from the given config and masterOverride. @@ -388,6 +390,11 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname}) + var healthzServer *healthcheck.HealthzServer + if len(config.HealthzBindAddress) > 0 { + healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration) + } + var proxier proxy.ProxyProvider var serviceEventHandler proxyconfig.ServiceHandler // TODO: Migrate all handlers to ServiceHandler types and @@ -416,6 +423,7 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx hostname, getNodeIP(client, hostname), recorder, + healthzServer, ) if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) @@ -504,13 +512,14 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx Conntracker: &realConntracker{}, ProxyMode: proxyMode, NodeRef: nodeRef, - HealthzBindAddress: config.HealthzBindAddress, + MetricsBindAddress: config.MetricsBindAddress, OOMScoreAdj: config.OOMScoreAdj, ResourceContainer: config.ResourceContainer, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, ServiceEventHandler: serviceEventHandler, ServiceHandler: serviceHandler, EndpointsEventHandler: endpointsEventHandler, + HealthzServer: healthzServer, }, nil } @@ -546,17 +555,22 @@ func (s *ProxyServer) Run() error { s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")}) - // Start up a webserver if requested - if len(s.HealthzBindAddress) > 0 { + // Start up a healthz server if requested + if s.HealthzServer != nil { + s.HealthzServer.Run() + } + + // Start up a metrics server if requested + if len(s.MetricsBindAddress) > 0 { http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "%s", s.ProxyMode) }) http.Handle("/metrics", prometheus.Handler()) configz.InstallHandler(http.DefaultServeMux) go wait.Until(func() { - err := http.ListenAndServe(s.HealthzBindAddress, nil) + err := http.ListenAndServe(s.MetricsBindAddress, nil) if err != nil { - utilruntime.HandleError(fmt.Errorf("starting health server failed: %v", err)) + utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err)) } }, 5*time.Second, wait.NeverStop) } diff --git a/cmd/kube-proxy/proxy.go b/cmd/kube-proxy/proxy.go index ab1d5df90d3..5d6ba8a8cb0 100644 --- a/cmd/kube-proxy/proxy.go +++ b/cmd/kube-proxy/proxy.go @@ -22,7 +22,6 @@ import ( "github.com/spf13/pflag" - "k8s.io/apiserver/pkg/server/healthz" utilflag "k8s.io/apiserver/pkg/util/flag" "k8s.io/apiserver/pkg/util/logs" "k8s.io/kubernetes/cmd/kube-proxy/app" @@ -32,8 +31,6 @@ import ( ) func main() { - healthz.DefaultHealthz() - command := app.NewProxyCommand() // TODO: once we switch everything over to Cobra commands, we can go back to calling diff --git a/hack/.linted_packages b/hack/.linted_packages index a5a6b85717d..ca02c7284ee 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -199,6 +199,7 @@ pkg/kubelet/volumemanager/cache pkg/kubelet/volumemanager/populator pkg/kubelet/volumemanager/reconciler pkg/labels +pkg/master/ports pkg/printers pkg/proxy/config pkg/proxy/healthcheck diff --git a/pkg/apis/componentconfig/types.go b/pkg/apis/componentconfig/types.go index 91ab2c57847..ef8d0ca23dc 100644 --- a/pkg/apis/componentconfig/types.go +++ b/pkg/apis/componentconfig/types.go @@ -96,8 +96,11 @@ type KubeProxyConfiguration struct { // for all interfaces) BindAddress string // healthzBindAddress is the IP address and port for the health check server to serve on, - // defaulting to 127.0.0.1:10249 (set to 0.0.0.0 for all interfaces) + // defaulting to 0.0.0.0:10256 HealthzBindAddress string + // metricsBindAddress is the IP address and port for the metrics server to serve on, + // defaulting to 127.0.0.1:10249 (set to 0.0.0.0 for all interfaces) + MetricsBindAddress string // clusterCIDR is the CIDR range of the pods in the cluster. It is used to // bridge traffic coming from outside of the cluster. If not provided, // no off-cluster bridging will be performed. diff --git a/pkg/apis/componentconfig/v1alpha1/defaults.go b/pkg/apis/componentconfig/v1alpha1/defaults.go index b6c15e8696b..7bd93b8f726 100644 --- a/pkg/apis/componentconfig/v1alpha1/defaults.go +++ b/pkg/apis/componentconfig/v1alpha1/defaults.go @@ -17,6 +17,7 @@ limitations under the License. package v1alpha1 import ( + "fmt" "path/filepath" "runtime" "strings" @@ -63,10 +64,15 @@ func SetDefaults_KubeProxyConfiguration(obj *KubeProxyConfiguration) { if len(obj.BindAddress) == 0 { obj.BindAddress = "0.0.0.0" } - if len(obj.HealthzBindAddress) == 0 { - obj.HealthzBindAddress = "127.0.0.1:10249" + if obj.HealthzBindAddress == "" { + obj.HealthzBindAddress = fmt.Sprintf("0.0.0.0:%v", ports.ProxyHealthzPort) } else if !strings.Contains(obj.HealthzBindAddress, ":") { - obj.HealthzBindAddress = ":10249" + obj.HealthzBindAddress += fmt.Sprintf(":%v", ports.ProxyHealthzPort) + } + if obj.MetricsBindAddress == "" { + obj.MetricsBindAddress = fmt.Sprintf("127.0.0.1:%v", ports.ProxyStatusPort) + } else if !strings.Contains(obj.MetricsBindAddress, ":") { + obj.MetricsBindAddress += fmt.Sprintf(":%v", ports.ProxyStatusPort) } if obj.OOMScoreAdj == nil { temp := int32(qos.KubeProxyOOMScoreAdj) diff --git a/pkg/apis/componentconfig/v1alpha1/types.go b/pkg/apis/componentconfig/v1alpha1/types.go index 325aea45c1f..90b982099e4 100644 --- a/pkg/apis/componentconfig/v1alpha1/types.go +++ b/pkg/apis/componentconfig/v1alpha1/types.go @@ -92,8 +92,11 @@ type KubeProxyConfiguration struct { // for all interfaces) BindAddress string `json:"bindAddress"` // healthzBindAddress is the IP address and port for the health check server to serve on, - // defaulting to 127.0.0.1:10249 (set to 0.0.0.0 for all interfaces) + // defaulting to 0.0.0.0:10256 HealthzBindAddress string `json:"healthzBindAddress"` + // metricsBindAddress is the IP address and port for the metrics server to serve on, + // defaulting to 127.0.0.1:10249 (set to 0.0.0.0 for all interfaces) + MetricsBindAddress string `json:"metricsBindAddress"` // clusterCIDR is the CIDR range of the pods in the cluster. It is used to // bridge traffic coming from outside of the cluster. If not provided, // no off-cluster bridging will be performed. diff --git a/pkg/apis/componentconfig/v1alpha1/zz_generated.conversion.go b/pkg/apis/componentconfig/v1alpha1/zz_generated.conversion.go index d0bbf4aeb17..be30b4956eb 100644 --- a/pkg/apis/componentconfig/v1alpha1/zz_generated.conversion.go +++ b/pkg/apis/componentconfig/v1alpha1/zz_generated.conversion.go @@ -99,6 +99,7 @@ func autoConvert_v1alpha1_KubeProxyConfiguration_To_componentconfig_KubeProxyCon out.FeatureGates = in.FeatureGates out.BindAddress = in.BindAddress out.HealthzBindAddress = in.HealthzBindAddress + out.MetricsBindAddress = in.MetricsBindAddress out.ClusterCIDR = in.ClusterCIDR out.HostnameOverride = in.HostnameOverride if err := Convert_v1alpha1_ClientConnectionConfiguration_To_componentconfig_ClientConnectionConfiguration(&in.ClientConnection, &out.ClientConnection, s); err != nil { @@ -128,6 +129,7 @@ func autoConvert_componentconfig_KubeProxyConfiguration_To_v1alpha1_KubeProxyCon out.FeatureGates = in.FeatureGates out.BindAddress = in.BindAddress out.HealthzBindAddress = in.HealthzBindAddress + out.MetricsBindAddress = in.MetricsBindAddress out.ClusterCIDR = in.ClusterCIDR out.HostnameOverride = in.HostnameOverride if err := Convert_componentconfig_ClientConnectionConfiguration_To_v1alpha1_ClientConnectionConfiguration(&in.ClientConnection, &out.ClientConnection, s); err != nil { diff --git a/pkg/master/ports/ports.go b/pkg/master/ports/ports.go index 4f3eed88969..14508cc27f3 100644 --- a/pkg/master/ports/ports.go +++ b/pkg/master/ports/ports.go @@ -17,7 +17,7 @@ limitations under the License. package ports const ( - // ProxyPort is the default port for the proxy healthz server. + // ProxyStatusPort is the default port for the proxy metrics server. // May be overridden by a flag at startup. ProxyStatusPort = 10249 // KubeletPort is the default port for the kubelet server on each host machine. @@ -38,4 +38,7 @@ const ( // until heapster can transition to using the SSL endpoint. // TODO(roberthbailey): Remove this once we have a better solution for heapster. KubeletReadOnlyPort = 10255 + // ProxyHealthzPort is the default port for the proxy healthz server. + // May be overridden by a flag at startup. + ProxyHealthzPort = 10256 ) diff --git a/pkg/proxy/healthcheck/BUILD b/pkg/proxy/healthcheck/BUILD index b01bf8b12c7..7cb0d680544 100644 --- a/pkg/proxy/healthcheck/BUILD +++ b/pkg/proxy/healthcheck/BUILD @@ -22,6 +22,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/clock:go_default_library", ], ) @@ -34,6 +35,7 @@ go_test( "//vendor/github.com/davecgh/go-spew/spew:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/client-go/util/clock:go_default_library", ], ) diff --git a/pkg/proxy/healthcheck/healthcheck.go b/pkg/proxy/healthcheck/healthcheck.go index 26e8b721ac1..9e6cf7028a6 100644 --- a/pkg/proxy/healthcheck/healthcheck.go +++ b/pkg/proxy/healthcheck/healthcheck.go @@ -22,6 +22,8 @@ import ( "net/http" "strings" "sync" + "sync/atomic" + "time" "github.com/golang/glog" "github.com/renstrom/dedent" @@ -29,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/clock" "k8s.io/kubernetes/pkg/api" ) @@ -233,3 +236,92 @@ func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) erro } return nil } + +// HealthzUpdater allows callers to update healthz timestamp only. +type HealthzUpdater interface { + UpdateTimestamp() +} + +// HealthzServer returns 200 "OK" by default. Once timestamp has been +// updated, it verifies we don't exceed max no respond duration since +// last update. +type HealthzServer struct { + listener Listener + httpFactory HTTPServerFactory + clock clock.Clock + + addr string + port int32 + healthTimeout time.Duration + + lastUpdated atomic.Value +} + +// NewDefaultHealthzServer returns a default healthz http server. +func NewDefaultHealthzServer(addr string, healthTimeout time.Duration) *HealthzServer { + return newHealthzServer(nil, nil, nil, addr, healthTimeout) +} + +func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *HealthzServer { + if listener == nil { + listener = stdNetListener{} + } + if httpServerFactory == nil { + httpServerFactory = stdHTTPServerFactory{} + } + if c == nil { + c = clock.RealClock{} + } + return &HealthzServer{ + listener: listener, + httpFactory: httpServerFactory, + clock: c, + addr: addr, + healthTimeout: healthTimeout, + } +} + +// UpdateTimestamp updates the lastUpdated timestamp. +func (hs *HealthzServer) UpdateTimestamp() { + hs.lastUpdated.Store(hs.clock.Now()) +} + +// Run starts the healthz http server and returns. +func (hs *HealthzServer) Run() { + serveMux := http.NewServeMux() + serveMux.Handle("/healthz", healthzHandler{hs: hs}) + server := hs.httpFactory.New(hs.addr, serveMux) + listener, err := hs.listener.Listen(hs.addr) + if err != nil { + glog.Errorf("Failed to start healthz on %s: %v", hs.addr, err) + return + } + go func() { + glog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr) + if err := server.Serve(listener); err != nil { + glog.Errorf("Healhz closed: %v", err) + return + } + glog.Errorf("Unexpected healhz closed.") + }() +} + +type healthzHandler struct { + hs *HealthzServer +} + +func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + lastUpdated := time.Time{} + if val := h.hs.lastUpdated.Load(); val != nil { + lastUpdated = val.(time.Time) + } + currentTime := h.hs.clock.Now() + + resp.Header().Set("Content-Type", "application/json") + if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) { + resp.WriteHeader(http.StatusServiceUnavailable) + } else { + resp.WriteHeader(http.StatusOK) + } + fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime)) +} diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index a90a8a386ec..c39652f8ec2 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -22,11 +22,13 @@ import ( "net/http" "net/http/httptest" "testing" - - "github.com/davecgh/go-spew/spew" + "time" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/clock" + + "github.com/davecgh/go-spew/spew" ) type fakeListener struct { @@ -108,6 +110,11 @@ type hcPayload struct { LocalEndpoints int } +type healthzPayload struct { + LastUpdated string + CurrentTime string +} + func TestServer(t *testing.T) { listener := newFakeListener() httpFactory := newFakeHTTPServerFactory() @@ -355,3 +362,44 @@ func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints in t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints) } } + +func TestHealthzServer(t *testing.T) { + listener := newFakeListener() + httpFactory := newFakeHTTPServerFactory() + fakeClock := clock.NewFakeClock(time.Now()) + + hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second) + server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs}) + + // Should return 200 "OK" by default. + testHealthzHandler(server, http.StatusOK, t) + + // Should return 503 "ServiceUnavailable" if exceed max no respond duration. + hs.UpdateTimestamp() + fakeClock.Step(25 * time.Second) + testHealthzHandler(server, http.StatusServiceUnavailable, t) + + // Should return 200 "OK" if timestamp is valid. + hs.UpdateTimestamp() + fakeClock.Step(5 * time.Second) + testHealthzHandler(server, http.StatusOK, t) +} + +func testHealthzHandler(server HTTPServer, status int, t *testing.T) { + handler := server.(*fakeHTTPServer).handler + req, err := http.NewRequest("GET", "/healthz", nil) + if err != nil { + t.Fatal(err) + } + resp := httptest.NewRecorder() + + handler.ServeHTTP(resp, req) + + if resp.Code != status { + t.Errorf("expected status code %v, got %v", status, resp.Code) + } + var payload healthzPayload + if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil { + t.Fatal(err) + } +} diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index b3796aeee50..cfbb0aeab7e 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -301,6 +301,7 @@ type Proxier struct { portMapper portOpener recorder record.EventRecorder healthChecker healthcheck.Server + healthzServer healthcheck.HealthzUpdater } type localPort struct { @@ -351,6 +352,7 @@ func NewProxier(ipt utiliptables.Interface, hostname string, nodeIP net.IP, recorder record.EventRecorder, + healthzServer healthcheck.HealthzUpdater, ) (*Proxier, error) { // check valid user input if minSyncPeriod > syncPeriod { @@ -414,6 +416,7 @@ func NewProxier(ipt utiliptables.Interface, portMapper: &listenPortOpener{}, recorder: recorder, healthChecker: healthChecker, + healthzServer: healthzServer, }, nil } @@ -513,6 +516,10 @@ func (proxier *Proxier) Sync() { func (proxier *Proxier) SyncLoop() { t := time.NewTicker(proxier.syncPeriod) defer t.Stop() + // Update healthz timestamp at beginning in case Sync() never succeeds. + if proxier.healthzServer != nil { + proxier.healthzServer.UpdateTimestamp() + } for { <-t.C glog.V(6).Infof("Periodic sync") @@ -1488,6 +1495,11 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } proxier.portsMap = replacementPortsMap + // Update healthz timestamp if it is periodic sync. + if proxier.healthzServer != nil && reason == syncReasonForce { + proxier.healthzServer.UpdateTimestamp() + } + // Update healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the healthChecker // will just drop those endpoints. diff --git a/test/e2e/framework/networking_utils.go b/test/e2e/framework/networking_utils.go index ca30f3848b3..c08a239360b 100644 --- a/test/e2e/framework/networking_utils.go +++ b/test/e2e/framework/networking_utils.go @@ -273,8 +273,8 @@ func (config *NetworkingTestConfig) DialFromNode(protocol, targetIP string, targ // GetSelfURL executes a curl against the given path via kubectl exec into a // test container running with host networking, and fails if the output // doesn't match the expected string. -func (config *NetworkingTestConfig) GetSelfURL(path string, expected string) { - cmd := fmt.Sprintf("curl -q -s --connect-timeout 1 http://localhost:10249%s", path) +func (config *NetworkingTestConfig) GetSelfURL(port int32, path string, expected string) { + cmd := fmt.Sprintf("curl -i -q -s --connect-timeout 1 http://localhost:%d%s", port, path) By(fmt.Sprintf("Getting kube-proxy self URL %s", path)) // These are arbitrary timeouts. The curl command should pass on first try, diff --git a/test/e2e/networking.go b/test/e2e/networking.go index 15d722ad627..9834a5efcba 100644 --- a/test/e2e/networking.go +++ b/test/e2e/networking.go @@ -20,9 +20,11 @@ import ( "fmt" "net/http" - . "github.com/onsi/ginkgo" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" ) var _ = framework.KubeDescribe("Networking", func() { @@ -80,8 +82,8 @@ var _ = framework.KubeDescribe("Networking", func() { config := framework.NewNetworkingTestConfig(f) By("checking kube-proxy URLs") - config.GetSelfURL("/healthz", "ok") - config.GetSelfURL("/proxyMode", "iptables") // the default + config.GetSelfURL(ports.ProxyHealthzPort, "/healthz", "200 OK") + config.GetSelfURL(ports.ProxyStatusPort, "/proxyMode", "iptables") // the default }) // TODO: Remove [Slow] when this has had enough bake time to prove presubmit worthiness.