mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #44968 from MrHohn/kube-proxy-healthcheck
Automatic merge from submit-queue (batch tested with PRs 44727, 45409, 44968, 45122, 45493) Separate healthz server from metrics server in kube-proxy From #14661, proposal is on kubernetes/community#552. Couple bullet points as in commit: - /healthz will be served on 0.0.0.0:10256 by default. - /metrics and /proxyMode will be served on port 10249 as before. - Healthz handler will verify timestamp in iptables mode. /assign @nicksardo @bowei @thockin **Release note**: ```release-note NONE ```
This commit is contained in:
commit
332b095ca9
@ -31,7 +31,6 @@ go_library(
|
|||||||
"//pkg/version/prometheus:go_default_library",
|
"//pkg/version/prometheus:go_default_library",
|
||||||
"//pkg/version/verflag:go_default_library",
|
"//pkg/version/verflag:go_default_library",
|
||||||
"//vendor/github.com/spf13/pflag:go_default_library",
|
"//vendor/github.com/spf13/pflag:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
|
|
||||||
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library",
|
||||||
],
|
],
|
||||||
|
@ -25,6 +25,7 @@ go_library(
|
|||||||
"//pkg/kubelet/qos:go_default_library",
|
"//pkg/kubelet/qos:go_default_library",
|
||||||
"//pkg/proxy:go_default_library",
|
"//pkg/proxy:go_default_library",
|
||||||
"//pkg/proxy/config:go_default_library",
|
"//pkg/proxy/config:go_default_library",
|
||||||
|
"//pkg/proxy/healthcheck:go_default_library",
|
||||||
"//pkg/proxy/iptables:go_default_library",
|
"//pkg/proxy/iptables:go_default_library",
|
||||||
"//pkg/proxy/userspace:go_default_library",
|
"//pkg/proxy/userspace:go_default_library",
|
||||||
"//pkg/proxy/winuserspace:go_default_library",
|
"//pkg/proxy/winuserspace:go_default_library",
|
||||||
|
@ -51,6 +51,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/qos"
|
"k8s.io/kubernetes/pkg/kubelet/qos"
|
||||||
"k8s.io/kubernetes/pkg/proxy"
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
|
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
|
||||||
|
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||||
"k8s.io/kubernetes/pkg/proxy/iptables"
|
"k8s.io/kubernetes/pkg/proxy/iptables"
|
||||||
"k8s.io/kubernetes/pkg/proxy/userspace"
|
"k8s.io/kubernetes/pkg/proxy/userspace"
|
||||||
"k8s.io/kubernetes/pkg/proxy/winuserspace"
|
"k8s.io/kubernetes/pkg/proxy/winuserspace"
|
||||||
@ -249,7 +250,7 @@ func applyDefaults(in *componentconfig.KubeProxyConfiguration) (*componentconfig
|
|||||||
func NewProxyCommand() *cobra.Command {
|
func NewProxyCommand() *cobra.Command {
|
||||||
opts := Options{
|
opts := Options{
|
||||||
config: new(componentconfig.KubeProxyConfiguration),
|
config: new(componentconfig.KubeProxyConfiguration),
|
||||||
healthzPort: 10249,
|
healthzPort: 10256,
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
@ -296,7 +297,7 @@ type ProxyServer struct {
|
|||||||
ProxyMode string
|
ProxyMode string
|
||||||
NodeRef *clientv1.ObjectReference
|
NodeRef *clientv1.ObjectReference
|
||||||
CleanupAndExit bool
|
CleanupAndExit bool
|
||||||
HealthzBindAddress string
|
MetricsBindAddress string
|
||||||
OOMScoreAdj *int32
|
OOMScoreAdj *int32
|
||||||
ResourceContainer string
|
ResourceContainer string
|
||||||
ConfigSyncPeriod time.Duration
|
ConfigSyncPeriod time.Duration
|
||||||
@ -305,6 +306,7 @@ type ProxyServer struct {
|
|||||||
// get rid of this one.
|
// get rid of this one.
|
||||||
ServiceHandler proxyconfig.ServiceConfigHandler
|
ServiceHandler proxyconfig.ServiceConfigHandler
|
||||||
EndpointsEventHandler proxyconfig.EndpointsHandler
|
EndpointsEventHandler proxyconfig.EndpointsHandler
|
||||||
|
HealthzServer *healthcheck.HealthzServer
|
||||||
}
|
}
|
||||||
|
|
||||||
// createClients creates a kube client and an event client from the given config and masterOverride.
|
// createClients creates a kube client and an event client from the given config and masterOverride.
|
||||||
@ -388,6 +390,11 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
|
|||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname})
|
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname})
|
||||||
|
|
||||||
|
var healthzServer *healthcheck.HealthzServer
|
||||||
|
if len(config.HealthzBindAddress) > 0 {
|
||||||
|
healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
var proxier proxy.ProxyProvider
|
var proxier proxy.ProxyProvider
|
||||||
var serviceEventHandler proxyconfig.ServiceHandler
|
var serviceEventHandler proxyconfig.ServiceHandler
|
||||||
// TODO: Migrate all handlers to ServiceHandler types and
|
// TODO: Migrate all handlers to ServiceHandler types and
|
||||||
@ -416,6 +423,7 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
|
|||||||
hostname,
|
hostname,
|
||||||
getNodeIP(client, hostname),
|
getNodeIP(client, hostname),
|
||||||
recorder,
|
recorder,
|
||||||
|
healthzServer,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unable to create proxier: %v", err)
|
return nil, fmt.Errorf("unable to create proxier: %v", err)
|
||||||
@ -504,13 +512,14 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
|
|||||||
Conntracker: &realConntracker{},
|
Conntracker: &realConntracker{},
|
||||||
ProxyMode: proxyMode,
|
ProxyMode: proxyMode,
|
||||||
NodeRef: nodeRef,
|
NodeRef: nodeRef,
|
||||||
HealthzBindAddress: config.HealthzBindAddress,
|
MetricsBindAddress: config.MetricsBindAddress,
|
||||||
OOMScoreAdj: config.OOMScoreAdj,
|
OOMScoreAdj: config.OOMScoreAdj,
|
||||||
ResourceContainer: config.ResourceContainer,
|
ResourceContainer: config.ResourceContainer,
|
||||||
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
|
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
|
||||||
ServiceEventHandler: serviceEventHandler,
|
ServiceEventHandler: serviceEventHandler,
|
||||||
ServiceHandler: serviceHandler,
|
ServiceHandler: serviceHandler,
|
||||||
EndpointsEventHandler: endpointsEventHandler,
|
EndpointsEventHandler: endpointsEventHandler,
|
||||||
|
HealthzServer: healthzServer,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -546,17 +555,22 @@ func (s *ProxyServer) Run() error {
|
|||||||
|
|
||||||
s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
|
s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
|
||||||
|
|
||||||
// Start up a webserver if requested
|
// Start up a healthz server if requested
|
||||||
if len(s.HealthzBindAddress) > 0 {
|
if s.HealthzServer != nil {
|
||||||
|
s.HealthzServer.Run()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start up a metrics server if requested
|
||||||
|
if len(s.MetricsBindAddress) > 0 {
|
||||||
http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
|
http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
|
||||||
fmt.Fprintf(w, "%s", s.ProxyMode)
|
fmt.Fprintf(w, "%s", s.ProxyMode)
|
||||||
})
|
})
|
||||||
http.Handle("/metrics", prometheus.Handler())
|
http.Handle("/metrics", prometheus.Handler())
|
||||||
configz.InstallHandler(http.DefaultServeMux)
|
configz.InstallHandler(http.DefaultServeMux)
|
||||||
go wait.Until(func() {
|
go wait.Until(func() {
|
||||||
err := http.ListenAndServe(s.HealthzBindAddress, nil)
|
err := http.ListenAndServe(s.MetricsBindAddress, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("starting health server failed: %v", err))
|
utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
|
||||||
}
|
}
|
||||||
}, 5*time.Second, wait.NeverStop)
|
}, 5*time.Second, wait.NeverStop)
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,6 @@ import (
|
|||||||
|
|
||||||
"github.com/spf13/pflag"
|
"github.com/spf13/pflag"
|
||||||
|
|
||||||
"k8s.io/apiserver/pkg/server/healthz"
|
|
||||||
utilflag "k8s.io/apiserver/pkg/util/flag"
|
utilflag "k8s.io/apiserver/pkg/util/flag"
|
||||||
"k8s.io/apiserver/pkg/util/logs"
|
"k8s.io/apiserver/pkg/util/logs"
|
||||||
"k8s.io/kubernetes/cmd/kube-proxy/app"
|
"k8s.io/kubernetes/cmd/kube-proxy/app"
|
||||||
@ -32,8 +31,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
healthz.DefaultHealthz()
|
|
||||||
|
|
||||||
command := app.NewProxyCommand()
|
command := app.NewProxyCommand()
|
||||||
|
|
||||||
// TODO: once we switch everything over to Cobra commands, we can go back to calling
|
// TODO: once we switch everything over to Cobra commands, we can go back to calling
|
||||||
|
@ -199,6 +199,7 @@ pkg/kubelet/volumemanager/cache
|
|||||||
pkg/kubelet/volumemanager/populator
|
pkg/kubelet/volumemanager/populator
|
||||||
pkg/kubelet/volumemanager/reconciler
|
pkg/kubelet/volumemanager/reconciler
|
||||||
pkg/labels
|
pkg/labels
|
||||||
|
pkg/master/ports
|
||||||
pkg/printers
|
pkg/printers
|
||||||
pkg/proxy/config
|
pkg/proxy/config
|
||||||
pkg/proxy/healthcheck
|
pkg/proxy/healthcheck
|
||||||
|
@ -96,8 +96,11 @@ type KubeProxyConfiguration struct {
|
|||||||
// for all interfaces)
|
// for all interfaces)
|
||||||
BindAddress string
|
BindAddress string
|
||||||
// healthzBindAddress is the IP address and port for the health check server to serve on,
|
// healthzBindAddress is the IP address and port for the health check server to serve on,
|
||||||
// defaulting to 127.0.0.1:10249 (set to 0.0.0.0 for all interfaces)
|
// defaulting to 0.0.0.0:10256
|
||||||
HealthzBindAddress string
|
HealthzBindAddress string
|
||||||
|
// metricsBindAddress is the IP address and port for the metrics server to serve on,
|
||||||
|
// defaulting to 127.0.0.1:10249 (set to 0.0.0.0 for all interfaces)
|
||||||
|
MetricsBindAddress string
|
||||||
// clusterCIDR is the CIDR range of the pods in the cluster. It is used to
|
// clusterCIDR is the CIDR range of the pods in the cluster. It is used to
|
||||||
// bridge traffic coming from outside of the cluster. If not provided,
|
// bridge traffic coming from outside of the cluster. If not provided,
|
||||||
// no off-cluster bridging will be performed.
|
// no off-cluster bridging will be performed.
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package v1alpha1
|
package v1alpha1
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
@ -63,10 +64,15 @@ func SetDefaults_KubeProxyConfiguration(obj *KubeProxyConfiguration) {
|
|||||||
if len(obj.BindAddress) == 0 {
|
if len(obj.BindAddress) == 0 {
|
||||||
obj.BindAddress = "0.0.0.0"
|
obj.BindAddress = "0.0.0.0"
|
||||||
}
|
}
|
||||||
if len(obj.HealthzBindAddress) == 0 {
|
if obj.HealthzBindAddress == "" {
|
||||||
obj.HealthzBindAddress = "127.0.0.1:10249"
|
obj.HealthzBindAddress = fmt.Sprintf("0.0.0.0:%v", ports.ProxyHealthzPort)
|
||||||
} else if !strings.Contains(obj.HealthzBindAddress, ":") {
|
} else if !strings.Contains(obj.HealthzBindAddress, ":") {
|
||||||
obj.HealthzBindAddress = ":10249"
|
obj.HealthzBindAddress += fmt.Sprintf(":%v", ports.ProxyHealthzPort)
|
||||||
|
}
|
||||||
|
if obj.MetricsBindAddress == "" {
|
||||||
|
obj.MetricsBindAddress = fmt.Sprintf("127.0.0.1:%v", ports.ProxyStatusPort)
|
||||||
|
} else if !strings.Contains(obj.MetricsBindAddress, ":") {
|
||||||
|
obj.MetricsBindAddress += fmt.Sprintf(":%v", ports.ProxyStatusPort)
|
||||||
}
|
}
|
||||||
if obj.OOMScoreAdj == nil {
|
if obj.OOMScoreAdj == nil {
|
||||||
temp := int32(qos.KubeProxyOOMScoreAdj)
|
temp := int32(qos.KubeProxyOOMScoreAdj)
|
||||||
|
@ -92,8 +92,11 @@ type KubeProxyConfiguration struct {
|
|||||||
// for all interfaces)
|
// for all interfaces)
|
||||||
BindAddress string `json:"bindAddress"`
|
BindAddress string `json:"bindAddress"`
|
||||||
// healthzBindAddress is the IP address and port for the health check server to serve on,
|
// healthzBindAddress is the IP address and port for the health check server to serve on,
|
||||||
// defaulting to 127.0.0.1:10249 (set to 0.0.0.0 for all interfaces)
|
// defaulting to 0.0.0.0:10256
|
||||||
HealthzBindAddress string `json:"healthzBindAddress"`
|
HealthzBindAddress string `json:"healthzBindAddress"`
|
||||||
|
// metricsBindAddress is the IP address and port for the metrics server to serve on,
|
||||||
|
// defaulting to 127.0.0.1:10249 (set to 0.0.0.0 for all interfaces)
|
||||||
|
MetricsBindAddress string `json:"metricsBindAddress"`
|
||||||
// clusterCIDR is the CIDR range of the pods in the cluster. It is used to
|
// clusterCIDR is the CIDR range of the pods in the cluster. It is used to
|
||||||
// bridge traffic coming from outside of the cluster. If not provided,
|
// bridge traffic coming from outside of the cluster. If not provided,
|
||||||
// no off-cluster bridging will be performed.
|
// no off-cluster bridging will be performed.
|
||||||
|
@ -99,6 +99,7 @@ func autoConvert_v1alpha1_KubeProxyConfiguration_To_componentconfig_KubeProxyCon
|
|||||||
out.FeatureGates = in.FeatureGates
|
out.FeatureGates = in.FeatureGates
|
||||||
out.BindAddress = in.BindAddress
|
out.BindAddress = in.BindAddress
|
||||||
out.HealthzBindAddress = in.HealthzBindAddress
|
out.HealthzBindAddress = in.HealthzBindAddress
|
||||||
|
out.MetricsBindAddress = in.MetricsBindAddress
|
||||||
out.ClusterCIDR = in.ClusterCIDR
|
out.ClusterCIDR = in.ClusterCIDR
|
||||||
out.HostnameOverride = in.HostnameOverride
|
out.HostnameOverride = in.HostnameOverride
|
||||||
if err := Convert_v1alpha1_ClientConnectionConfiguration_To_componentconfig_ClientConnectionConfiguration(&in.ClientConnection, &out.ClientConnection, s); err != nil {
|
if err := Convert_v1alpha1_ClientConnectionConfiguration_To_componentconfig_ClientConnectionConfiguration(&in.ClientConnection, &out.ClientConnection, s); err != nil {
|
||||||
@ -128,6 +129,7 @@ func autoConvert_componentconfig_KubeProxyConfiguration_To_v1alpha1_KubeProxyCon
|
|||||||
out.FeatureGates = in.FeatureGates
|
out.FeatureGates = in.FeatureGates
|
||||||
out.BindAddress = in.BindAddress
|
out.BindAddress = in.BindAddress
|
||||||
out.HealthzBindAddress = in.HealthzBindAddress
|
out.HealthzBindAddress = in.HealthzBindAddress
|
||||||
|
out.MetricsBindAddress = in.MetricsBindAddress
|
||||||
out.ClusterCIDR = in.ClusterCIDR
|
out.ClusterCIDR = in.ClusterCIDR
|
||||||
out.HostnameOverride = in.HostnameOverride
|
out.HostnameOverride = in.HostnameOverride
|
||||||
if err := Convert_componentconfig_ClientConnectionConfiguration_To_v1alpha1_ClientConnectionConfiguration(&in.ClientConnection, &out.ClientConnection, s); err != nil {
|
if err := Convert_componentconfig_ClientConnectionConfiguration_To_v1alpha1_ClientConnectionConfiguration(&in.ClientConnection, &out.ClientConnection, s); err != nil {
|
||||||
|
@ -17,7 +17,7 @@ limitations under the License.
|
|||||||
package ports
|
package ports
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// ProxyPort is the default port for the proxy healthz server.
|
// ProxyStatusPort is the default port for the proxy metrics server.
|
||||||
// May be overridden by a flag at startup.
|
// May be overridden by a flag at startup.
|
||||||
ProxyStatusPort = 10249
|
ProxyStatusPort = 10249
|
||||||
// KubeletPort is the default port for the kubelet server on each host machine.
|
// KubeletPort is the default port for the kubelet server on each host machine.
|
||||||
@ -38,4 +38,7 @@ const (
|
|||||||
// until heapster can transition to using the SSL endpoint.
|
// until heapster can transition to using the SSL endpoint.
|
||||||
// TODO(roberthbailey): Remove this once we have a better solution for heapster.
|
// TODO(roberthbailey): Remove this once we have a better solution for heapster.
|
||||||
KubeletReadOnlyPort = 10255
|
KubeletReadOnlyPort = 10255
|
||||||
|
// ProxyHealthzPort is the default port for the proxy healthz server.
|
||||||
|
// May be overridden by a flag at startup.
|
||||||
|
ProxyHealthzPort = 10256
|
||||||
)
|
)
|
||||||
|
@ -22,6 +22,7 @@ go_library(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
|
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/util/clock:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -34,6 +35,7 @@ go_test(
|
|||||||
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
|
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/util/clock:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -22,6 +22,8 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"github.com/renstrom/dedent"
|
"github.com/renstrom/dedent"
|
||||||
@ -29,6 +31,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
clientv1 "k8s.io/client-go/pkg/api/v1"
|
clientv1 "k8s.io/client-go/pkg/api/v1"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
"k8s.io/client-go/util/clock"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -233,3 +236,92 @@ func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) erro
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HealthzUpdater allows callers to update healthz timestamp only.
|
||||||
|
type HealthzUpdater interface {
|
||||||
|
UpdateTimestamp()
|
||||||
|
}
|
||||||
|
|
||||||
|
// HealthzServer returns 200 "OK" by default. Once timestamp has been
|
||||||
|
// updated, it verifies we don't exceed max no respond duration since
|
||||||
|
// last update.
|
||||||
|
type HealthzServer struct {
|
||||||
|
listener Listener
|
||||||
|
httpFactory HTTPServerFactory
|
||||||
|
clock clock.Clock
|
||||||
|
|
||||||
|
addr string
|
||||||
|
port int32
|
||||||
|
healthTimeout time.Duration
|
||||||
|
|
||||||
|
lastUpdated atomic.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDefaultHealthzServer returns a default healthz http server.
|
||||||
|
func NewDefaultHealthzServer(addr string, healthTimeout time.Duration) *HealthzServer {
|
||||||
|
return newHealthzServer(nil, nil, nil, addr, healthTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *HealthzServer {
|
||||||
|
if listener == nil {
|
||||||
|
listener = stdNetListener{}
|
||||||
|
}
|
||||||
|
if httpServerFactory == nil {
|
||||||
|
httpServerFactory = stdHTTPServerFactory{}
|
||||||
|
}
|
||||||
|
if c == nil {
|
||||||
|
c = clock.RealClock{}
|
||||||
|
}
|
||||||
|
return &HealthzServer{
|
||||||
|
listener: listener,
|
||||||
|
httpFactory: httpServerFactory,
|
||||||
|
clock: c,
|
||||||
|
addr: addr,
|
||||||
|
healthTimeout: healthTimeout,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateTimestamp updates the lastUpdated timestamp.
|
||||||
|
func (hs *HealthzServer) UpdateTimestamp() {
|
||||||
|
hs.lastUpdated.Store(hs.clock.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts the healthz http server and returns.
|
||||||
|
func (hs *HealthzServer) Run() {
|
||||||
|
serveMux := http.NewServeMux()
|
||||||
|
serveMux.Handle("/healthz", healthzHandler{hs: hs})
|
||||||
|
server := hs.httpFactory.New(hs.addr, serveMux)
|
||||||
|
listener, err := hs.listener.Listen(hs.addr)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to start healthz on %s: %v", hs.addr, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
glog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr)
|
||||||
|
if err := server.Serve(listener); err != nil {
|
||||||
|
glog.Errorf("Healhz closed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
glog.Errorf("Unexpected healhz closed.")
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
type healthzHandler struct {
|
||||||
|
hs *HealthzServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
|
||||||
|
lastUpdated := time.Time{}
|
||||||
|
if val := h.hs.lastUpdated.Load(); val != nil {
|
||||||
|
lastUpdated = val.(time.Time)
|
||||||
|
}
|
||||||
|
currentTime := h.hs.clock.Now()
|
||||||
|
|
||||||
|
resp.Header().Set("Content-Type", "application/json")
|
||||||
|
if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) {
|
||||||
|
resp.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
} else {
|
||||||
|
resp.WriteHeader(http.StatusOK)
|
||||||
|
}
|
||||||
|
fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime))
|
||||||
|
}
|
||||||
|
@ -22,11 +22,13 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
"github.com/davecgh/go-spew/spew"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/client-go/util/clock"
|
||||||
|
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
)
|
)
|
||||||
|
|
||||||
type fakeListener struct {
|
type fakeListener struct {
|
||||||
@ -108,6 +110,11 @@ type hcPayload struct {
|
|||||||
LocalEndpoints int
|
LocalEndpoints int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type healthzPayload struct {
|
||||||
|
LastUpdated string
|
||||||
|
CurrentTime string
|
||||||
|
}
|
||||||
|
|
||||||
func TestServer(t *testing.T) {
|
func TestServer(t *testing.T) {
|
||||||
listener := newFakeListener()
|
listener := newFakeListener()
|
||||||
httpFactory := newFakeHTTPServerFactory()
|
httpFactory := newFakeHTTPServerFactory()
|
||||||
@ -355,3 +362,44 @@ func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints in
|
|||||||
t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints)
|
t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHealthzServer(t *testing.T) {
|
||||||
|
listener := newFakeListener()
|
||||||
|
httpFactory := newFakeHTTPServerFactory()
|
||||||
|
fakeClock := clock.NewFakeClock(time.Now())
|
||||||
|
|
||||||
|
hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
|
||||||
|
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
|
||||||
|
|
||||||
|
// Should return 200 "OK" by default.
|
||||||
|
testHealthzHandler(server, http.StatusOK, t)
|
||||||
|
|
||||||
|
// Should return 503 "ServiceUnavailable" if exceed max no respond duration.
|
||||||
|
hs.UpdateTimestamp()
|
||||||
|
fakeClock.Step(25 * time.Second)
|
||||||
|
testHealthzHandler(server, http.StatusServiceUnavailable, t)
|
||||||
|
|
||||||
|
// Should return 200 "OK" if timestamp is valid.
|
||||||
|
hs.UpdateTimestamp()
|
||||||
|
fakeClock.Step(5 * time.Second)
|
||||||
|
testHealthzHandler(server, http.StatusOK, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testHealthzHandler(server HTTPServer, status int, t *testing.T) {
|
||||||
|
handler := server.(*fakeHTTPServer).handler
|
||||||
|
req, err := http.NewRequest("GET", "/healthz", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
|
||||||
|
handler.ServeHTTP(resp, req)
|
||||||
|
|
||||||
|
if resp.Code != status {
|
||||||
|
t.Errorf("expected status code %v, got %v", status, resp.Code)
|
||||||
|
}
|
||||||
|
var payload healthzPayload
|
||||||
|
if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -301,6 +301,7 @@ type Proxier struct {
|
|||||||
portMapper portOpener
|
portMapper portOpener
|
||||||
recorder record.EventRecorder
|
recorder record.EventRecorder
|
||||||
healthChecker healthcheck.Server
|
healthChecker healthcheck.Server
|
||||||
|
healthzServer healthcheck.HealthzUpdater
|
||||||
}
|
}
|
||||||
|
|
||||||
type localPort struct {
|
type localPort struct {
|
||||||
@ -351,6 +352,7 @@ func NewProxier(ipt utiliptables.Interface,
|
|||||||
hostname string,
|
hostname string,
|
||||||
nodeIP net.IP,
|
nodeIP net.IP,
|
||||||
recorder record.EventRecorder,
|
recorder record.EventRecorder,
|
||||||
|
healthzServer healthcheck.HealthzUpdater,
|
||||||
) (*Proxier, error) {
|
) (*Proxier, error) {
|
||||||
// check valid user input
|
// check valid user input
|
||||||
if minSyncPeriod > syncPeriod {
|
if minSyncPeriod > syncPeriod {
|
||||||
@ -414,6 +416,7 @@ func NewProxier(ipt utiliptables.Interface,
|
|||||||
portMapper: &listenPortOpener{},
|
portMapper: &listenPortOpener{},
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
healthChecker: healthChecker,
|
healthChecker: healthChecker,
|
||||||
|
healthzServer: healthzServer,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -513,6 +516,10 @@ func (proxier *Proxier) Sync() {
|
|||||||
func (proxier *Proxier) SyncLoop() {
|
func (proxier *Proxier) SyncLoop() {
|
||||||
t := time.NewTicker(proxier.syncPeriod)
|
t := time.NewTicker(proxier.syncPeriod)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
// Update healthz timestamp at beginning in case Sync() never succeeds.
|
||||||
|
if proxier.healthzServer != nil {
|
||||||
|
proxier.healthzServer.UpdateTimestamp()
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
<-t.C
|
<-t.C
|
||||||
glog.V(6).Infof("Periodic sync")
|
glog.V(6).Infof("Periodic sync")
|
||||||
@ -1488,6 +1495,11 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
|
|||||||
}
|
}
|
||||||
proxier.portsMap = replacementPortsMap
|
proxier.portsMap = replacementPortsMap
|
||||||
|
|
||||||
|
// Update healthz timestamp if it is periodic sync.
|
||||||
|
if proxier.healthzServer != nil && reason == syncReasonForce {
|
||||||
|
proxier.healthzServer.UpdateTimestamp()
|
||||||
|
}
|
||||||
|
|
||||||
// Update healthchecks. The endpoints list might include services that are
|
// Update healthchecks. The endpoints list might include services that are
|
||||||
// not "OnlyLocal", but the services list will not, and the healthChecker
|
// not "OnlyLocal", but the services list will not, and the healthChecker
|
||||||
// will just drop those endpoints.
|
// will just drop those endpoints.
|
||||||
|
@ -273,8 +273,8 @@ func (config *NetworkingTestConfig) DialFromNode(protocol, targetIP string, targ
|
|||||||
// GetSelfURL executes a curl against the given path via kubectl exec into a
|
// GetSelfURL executes a curl against the given path via kubectl exec into a
|
||||||
// test container running with host networking, and fails if the output
|
// test container running with host networking, and fails if the output
|
||||||
// doesn't match the expected string.
|
// doesn't match the expected string.
|
||||||
func (config *NetworkingTestConfig) GetSelfURL(path string, expected string) {
|
func (config *NetworkingTestConfig) GetSelfURL(port int32, path string, expected string) {
|
||||||
cmd := fmt.Sprintf("curl -q -s --connect-timeout 1 http://localhost:10249%s", path)
|
cmd := fmt.Sprintf("curl -i -q -s --connect-timeout 1 http://localhost:%d%s", port, path)
|
||||||
By(fmt.Sprintf("Getting kube-proxy self URL %s", path))
|
By(fmt.Sprintf("Getting kube-proxy self URL %s", path))
|
||||||
|
|
||||||
// These are arbitrary timeouts. The curl command should pass on first try,
|
// These are arbitrary timeouts. The curl command should pass on first try,
|
||||||
|
@ -20,9 +20,11 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/kubernetes/pkg/master/ports"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = framework.KubeDescribe("Networking", func() {
|
var _ = framework.KubeDescribe("Networking", func() {
|
||||||
@ -80,8 +82,8 @@ var _ = framework.KubeDescribe("Networking", func() {
|
|||||||
config := framework.NewNetworkingTestConfig(f)
|
config := framework.NewNetworkingTestConfig(f)
|
||||||
|
|
||||||
By("checking kube-proxy URLs")
|
By("checking kube-proxy URLs")
|
||||||
config.GetSelfURL("/healthz", "ok")
|
config.GetSelfURL(ports.ProxyHealthzPort, "/healthz", "200 OK")
|
||||||
config.GetSelfURL("/proxyMode", "iptables") // the default
|
config.GetSelfURL(ports.ProxyStatusPort, "/proxyMode", "iptables") // the default
|
||||||
})
|
})
|
||||||
|
|
||||||
// TODO: Remove [Slow] when this has had enough bake time to prove presubmit worthiness.
|
// TODO: Remove [Slow] when this has had enough bake time to prove presubmit worthiness.
|
||||||
|
Loading…
Reference in New Issue
Block a user