mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #83498 from danwinship/proxy-health
Fix kube-proxy healthz server for proxier sync loop changes
This commit is contained in:
commit
af6f302e46
@ -482,7 +482,7 @@ type ProxyServer struct {
|
||||
UseEndpointSlices bool
|
||||
OOMScoreAdj *int32
|
||||
ConfigSyncPeriod time.Duration
|
||||
HealthzServer *healthcheck.HealthzServer
|
||||
HealthzServer *healthcheck.ProxierHealthServer
|
||||
}
|
||||
|
||||
// createClients creates a kube client and an event client from the given config and masterOverride.
|
||||
|
@ -125,11 +125,9 @@ func newProxyServer(
|
||||
Namespace: "",
|
||||
}
|
||||
|
||||
var healthzServer *healthcheck.HealthzServer
|
||||
var healthzUpdater healthcheck.HealthzUpdater
|
||||
var healthzServer *healthcheck.ProxierHealthServer
|
||||
if len(config.HealthzBindAddress) > 0 {
|
||||
healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
|
||||
healthzUpdater = healthzServer
|
||||
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
|
||||
}
|
||||
|
||||
var proxier proxy.Provider
|
||||
@ -162,7 +160,7 @@ func newProxyServer(
|
||||
hostname,
|
||||
nodeIP,
|
||||
recorder,
|
||||
healthzUpdater,
|
||||
healthzServer,
|
||||
config.NodePortAddresses,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -87,11 +87,9 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
|
||||
Namespace: "",
|
||||
}
|
||||
|
||||
var healthzServer *healthcheck.HealthzServer
|
||||
var healthzUpdater healthcheck.HealthzUpdater
|
||||
var healthzServer *healthcheck.ProxierHealthServer
|
||||
if len(config.HealthzBindAddress) > 0 {
|
||||
healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
|
||||
healthzUpdater = healthzServer
|
||||
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
|
||||
}
|
||||
|
||||
var proxier proxy.Provider
|
||||
@ -108,7 +106,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
|
||||
hostname,
|
||||
utilnode.GetNodeIP(client, hostname),
|
||||
recorder,
|
||||
healthzUpdater,
|
||||
healthzServer,
|
||||
config.Winkernel,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -9,8 +9,10 @@ load(
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"common.go",
|
||||
"doc.go",
|
||||
"healthcheck.go",
|
||||
"proxier_health.go",
|
||||
"service_health.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/proxy/healthcheck",
|
||||
deps = [
|
||||
|
63
pkg/proxy/healthcheck/common.go
Normal file
63
pkg/proxy/healthcheck/common.go
Normal file
@ -0,0 +1,63 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// listener allows for testing of ServiceHealthServer and ProxierHealthServer.
|
||||
type listener interface {
|
||||
// Listen is very much like net.Listen, except the first arg (network) is
|
||||
// fixed to be "tcp".
|
||||
Listen(addr string) (net.Listener, error)
|
||||
}
|
||||
|
||||
// httpServerFactory allows for testing of ServiceHealthServer and ProxierHealthServer.
|
||||
type httpServerFactory interface {
|
||||
// New creates an instance of a type satisfying HTTPServer. This is
|
||||
// designed to include http.Server.
|
||||
New(addr string, handler http.Handler) httpServer
|
||||
}
|
||||
|
||||
// httpServer allows for testing of ServiceHealthServer and ProxierHealthServer.
|
||||
// It is designed so that http.Server satisfies this interface,
|
||||
type httpServer interface {
|
||||
Serve(listener net.Listener) error
|
||||
}
|
||||
|
||||
// Implement listener in terms of net.Listen.
|
||||
type stdNetListener struct{}
|
||||
|
||||
func (stdNetListener) Listen(addr string) (net.Listener, error) {
|
||||
return net.Listen("tcp", addr)
|
||||
}
|
||||
|
||||
var _ listener = stdNetListener{}
|
||||
|
||||
// Implement httpServerFactory in terms of http.Server.
|
||||
type stdHTTPServerFactory struct{}
|
||||
|
||||
func (stdHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
|
||||
return &http.Server{
|
||||
Addr: addr,
|
||||
Handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
var _ httpServerFactory = stdHTTPServerFactory{}
|
@ -79,7 +79,7 @@ func newFakeHTTPServerFactory() *fakeHTTPServerFactory {
|
||||
return &fakeHTTPServerFactory{}
|
||||
}
|
||||
|
||||
func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer {
|
||||
func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
|
||||
return &fakeHTTPServer{
|
||||
addr: addr,
|
||||
handler: handler,
|
||||
@ -119,7 +119,7 @@ func TestServer(t *testing.T) {
|
||||
listener := newFakeListener()
|
||||
httpFactory := newFakeHTTPServerFactory()
|
||||
|
||||
hcsi := NewServer("hostname", nil, listener, httpFactory)
|
||||
hcsi := newServiceHealthServer("hostname", nil, listener, httpFactory)
|
||||
hcs := hcsi.(*server)
|
||||
if len(hcs.services) != 0 {
|
||||
t.Errorf("expected 0 services, got %d", len(hcs.services))
|
||||
@ -368,24 +368,32 @@ func TestHealthzServer(t *testing.T) {
|
||||
httpFactory := newFakeHTTPServerFactory()
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
|
||||
hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
|
||||
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
|
||||
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
|
||||
|
||||
// Should return 200 "OK" by default.
|
||||
testHealthzHandler(server, http.StatusOK, t)
|
||||
|
||||
// Should return 503 "ServiceUnavailable" if exceed max no respond duration.
|
||||
hs.UpdateTimestamp()
|
||||
// Should return 200 "OK" after first update
|
||||
hs.Updated()
|
||||
testHealthzHandler(server, http.StatusOK, t)
|
||||
|
||||
// Should continue to return 200 "OK" as long as no further updates are queued
|
||||
fakeClock.Step(25 * time.Second)
|
||||
testHealthzHandler(server, http.StatusOK, t)
|
||||
|
||||
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
|
||||
hs.QueuedUpdate()
|
||||
fakeClock.Step(25 * time.Second)
|
||||
testHealthzHandler(server, http.StatusServiceUnavailable, t)
|
||||
|
||||
// Should return 200 "OK" if timestamp is valid.
|
||||
hs.UpdateTimestamp()
|
||||
// Should return 200 "OK" after processing update
|
||||
hs.Updated()
|
||||
fakeClock.Step(5 * time.Second)
|
||||
testHealthzHandler(server, http.StatusOK, t)
|
||||
}
|
||||
|
||||
func testHealthzHandler(server HTTPServer, status int, t *testing.T) {
|
||||
func testHealthzHandler(server httpServer, status int, t *testing.T) {
|
||||
handler := server.(*fakeHTTPServer).handler
|
||||
req, err := http.NewRequest("GET", "/healthz", nil)
|
||||
if err != nil {
|
||||
|
163
pkg/proxy/healthcheck/proxier_health.go
Normal file
163
pkg/proxy/healthcheck/proxier_health.go
Normal file
@ -0,0 +1,163 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/record"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
)
|
||||
|
||||
var proxierHealthzRetryInterval = 60 * time.Second
|
||||
|
||||
// ProxierHealthUpdater allows callers to update healthz timestamp only.
|
||||
type ProxierHealthUpdater interface {
|
||||
// QueuedUpdate should be called when the proxier receives a Service or Endpoints
|
||||
// event containing information that requires updating service rules.
|
||||
QueuedUpdate()
|
||||
|
||||
// Updated should be called when the proxier has successfully updated the service
|
||||
// rules to reflect the current state.
|
||||
Updated()
|
||||
}
|
||||
|
||||
// ProxierHealthServer returns 200 "OK" by default. It verifies that the delay between
|
||||
// QueuedUpdate() calls and Updated() calls never exceeds healthTimeout.
|
||||
type ProxierHealthServer struct {
|
||||
listener listener
|
||||
httpFactory httpServerFactory
|
||||
clock clock.Clock
|
||||
|
||||
addr string
|
||||
port int32
|
||||
healthTimeout time.Duration
|
||||
recorder record.EventRecorder
|
||||
nodeRef *v1.ObjectReference
|
||||
|
||||
lastUpdated atomic.Value
|
||||
lastQueued atomic.Value
|
||||
}
|
||||
|
||||
// NewProxierHealthServer returns a proxier health http server.
|
||||
func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *ProxierHealthServer {
|
||||
return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout, recorder, nodeRef)
|
||||
}
|
||||
|
||||
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *ProxierHealthServer {
|
||||
return &ProxierHealthServer{
|
||||
listener: listener,
|
||||
httpFactory: httpServerFactory,
|
||||
clock: c,
|
||||
addr: addr,
|
||||
healthTimeout: healthTimeout,
|
||||
recorder: recorder,
|
||||
nodeRef: nodeRef,
|
||||
}
|
||||
}
|
||||
|
||||
// Updated updates the lastUpdated timestamp.
|
||||
func (hs *ProxierHealthServer) Updated() {
|
||||
hs.lastUpdated.Store(hs.clock.Now())
|
||||
}
|
||||
|
||||
// QueuedUpdate updates the lastQueued timestamp.
|
||||
func (hs *ProxierHealthServer) QueuedUpdate() {
|
||||
hs.lastQueued.Store(hs.clock.Now())
|
||||
}
|
||||
|
||||
// Run starts the healthz http server and returns.
|
||||
func (hs *ProxierHealthServer) Run() {
|
||||
serveMux := http.NewServeMux()
|
||||
serveMux.Handle("/healthz", healthzHandler{hs: hs})
|
||||
server := hs.httpFactory.New(hs.addr, serveMux)
|
||||
|
||||
go wait.Until(func() {
|
||||
klog.V(3).Infof("Starting goroutine for proxier healthz on %s", hs.addr)
|
||||
|
||||
listener, err := hs.listener.Listen(hs.addr)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Failed to start proxier healthz on %s: %v", hs.addr, err)
|
||||
if hs.recorder != nil {
|
||||
hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartProxierHealthcheck", msg)
|
||||
}
|
||||
klog.Error(msg)
|
||||
return
|
||||
}
|
||||
|
||||
if err := server.Serve(listener); err != nil {
|
||||
klog.Errorf("Proxier healthz closed with error: %v", err)
|
||||
return
|
||||
}
|
||||
klog.Error("Unexpected proxier healthz closed.")
|
||||
}, proxierHealthzRetryInterval, wait.NeverStop)
|
||||
}
|
||||
|
||||
type healthzHandler struct {
|
||||
hs *ProxierHealthServer
|
||||
}
|
||||
|
||||
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
|
||||
var lastQueued, lastUpdated time.Time
|
||||
if val := h.hs.lastQueued.Load(); val != nil {
|
||||
lastQueued = val.(time.Time)
|
||||
}
|
||||
if val := h.hs.lastUpdated.Load(); val != nil {
|
||||
lastUpdated = val.(time.Time)
|
||||
}
|
||||
currentTime := h.hs.clock.Now()
|
||||
|
||||
healthy := false
|
||||
switch {
|
||||
case lastUpdated.IsZero():
|
||||
// The proxy is healthy while it's starting up
|
||||
// TODO: this makes it useless as a readinessProbe. Consider changing
|
||||
// to only become healthy after the proxy is fully synced.
|
||||
healthy = true
|
||||
case lastUpdated.After(lastQueued):
|
||||
// We've processed all updates
|
||||
healthy = true
|
||||
case currentTime.Sub(lastQueued) < h.hs.healthTimeout:
|
||||
// There's an unprocessed update queued, but it's not late yet
|
||||
healthy = true
|
||||
}
|
||||
|
||||
resp.Header().Set("Content-Type", "application/json")
|
||||
resp.Header().Set("X-Content-Type-Options", "nosniff")
|
||||
if !healthy {
|
||||
resp.WriteHeader(http.StatusServiceUnavailable)
|
||||
} else {
|
||||
resp.WriteHeader(http.StatusOK)
|
||||
|
||||
// In older releases, the returned "lastUpdated" time indicated the last
|
||||
// time the proxier sync loop ran, even if nothing had changed. To
|
||||
// preserve compatibility, we use the same semantics: the returned
|
||||
// lastUpdated value is "recent" if the server is healthy. The kube-proxy
|
||||
// metrics provide more detailed information.
|
||||
lastUpdated = currentTime
|
||||
|
||||
}
|
||||
fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime))
|
||||
}
|
@ -22,27 +22,21 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/lithammer/dedent"
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/record"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
)
|
||||
|
||||
var nodeHealthzRetryInterval = 60 * time.Second
|
||||
|
||||
// Server serves HTTP endpoints for each service name, with results
|
||||
// ServiceHealthServer serves HTTP endpoints for each service name, with results
|
||||
// based on the endpoints. If there are 0 endpoints for a service, it returns a
|
||||
// 503 "Service Unavailable" error (telling LBs not to use this node). If there
|
||||
// are 1 or more endpoints, it returns a 200 "OK".
|
||||
type Server interface {
|
||||
type ServiceHealthServer interface {
|
||||
// Make the new set of services be active. Services that were open before
|
||||
// will be closed. Services that are new will be opened. Service that
|
||||
// existed and are in the new set will be left alone. The value of the map
|
||||
@ -54,73 +48,26 @@ type Server interface {
|
||||
SyncEndpoints(newEndpoints map[types.NamespacedName]int) error
|
||||
}
|
||||
|
||||
// Listener allows for testing of Server. If the Listener argument
|
||||
// to NewServer() is nil, the real net.Listen function will be used.
|
||||
type Listener interface {
|
||||
// Listen is very much like net.Listen, except the first arg (network) is
|
||||
// fixed to be "tcp".
|
||||
Listen(addr string) (net.Listener, error)
|
||||
}
|
||||
|
||||
// HTTPServerFactory allows for testing of Server. If the
|
||||
// HTTPServerFactory argument to NewServer() is nil, the real
|
||||
// http.Server type will be used.
|
||||
type HTTPServerFactory interface {
|
||||
// New creates an instance of a type satisfying HTTPServer. This is
|
||||
// designed to include http.Server.
|
||||
New(addr string, handler http.Handler) HTTPServer
|
||||
}
|
||||
|
||||
// HTTPServer allows for testing of Server.
|
||||
type HTTPServer interface {
|
||||
// Server is designed so that http.Server satisfies this interface,
|
||||
Serve(listener net.Listener) error
|
||||
}
|
||||
|
||||
// NewServer allocates a new healthcheck server manager. If either
|
||||
// of the injected arguments are nil, defaults will be used.
|
||||
func NewServer(hostname string, recorder record.EventRecorder, listener Listener, httpServerFactory HTTPServerFactory) Server {
|
||||
if listener == nil {
|
||||
listener = stdNetListener{}
|
||||
}
|
||||
if httpServerFactory == nil {
|
||||
httpServerFactory = stdHTTPServerFactory{}
|
||||
}
|
||||
func newServiceHealthServer(hostname string, recorder record.EventRecorder, listener listener, factory httpServerFactory) ServiceHealthServer {
|
||||
return &server{
|
||||
hostname: hostname,
|
||||
recorder: recorder,
|
||||
listener: listener,
|
||||
httpFactory: httpServerFactory,
|
||||
httpFactory: factory,
|
||||
services: map[types.NamespacedName]*hcInstance{},
|
||||
}
|
||||
}
|
||||
|
||||
// Implement Listener in terms of net.Listen.
|
||||
type stdNetListener struct{}
|
||||
|
||||
func (stdNetListener) Listen(addr string) (net.Listener, error) {
|
||||
return net.Listen("tcp", addr)
|
||||
// NewServiceHealthServer allocates a new service healthcheck server manager
|
||||
func NewServiceHealthServer(hostname string, recorder record.EventRecorder) ServiceHealthServer {
|
||||
return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{})
|
||||
}
|
||||
|
||||
var _ Listener = stdNetListener{}
|
||||
|
||||
// Implement HTTPServerFactory in terms of http.Server.
|
||||
type stdHTTPServerFactory struct{}
|
||||
|
||||
func (stdHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer {
|
||||
return &http.Server{
|
||||
Addr: addr,
|
||||
Handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
var _ HTTPServerFactory = stdHTTPServerFactory{}
|
||||
|
||||
type server struct {
|
||||
hostname string
|
||||
recorder record.EventRecorder // can be nil
|
||||
listener Listener
|
||||
httpFactory HTTPServerFactory
|
||||
listener listener
|
||||
httpFactory httpServerFactory
|
||||
|
||||
lock sync.RWMutex
|
||||
services map[types.NamespacedName]*hcInstance
|
||||
@ -187,7 +134,7 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err
|
||||
type hcInstance struct {
|
||||
port uint16
|
||||
listener net.Listener
|
||||
server HTTPServer
|
||||
server httpServer
|
||||
endpoints int // number of local endpoints for a service
|
||||
}
|
||||
|
||||
@ -247,103 +194,20 @@ func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// HealthzUpdater allows callers to update healthz timestamp only.
|
||||
type HealthzUpdater interface {
|
||||
UpdateTimestamp()
|
||||
// FakeServiceHealthServer is a fake ServiceHealthServer for test programs
|
||||
type FakeServiceHealthServer struct{}
|
||||
|
||||
// NewFakeServiceHealthServer allocates a new fake service healthcheck server manager
|
||||
func NewFakeServiceHealthServer() ServiceHealthServer {
|
||||
return FakeServiceHealthServer{}
|
||||
}
|
||||
|
||||
// HealthzServer returns 200 "OK" by default. Once timestamp has been
|
||||
// updated, it verifies we don't exceed max no respond duration since
|
||||
// last update.
|
||||
type HealthzServer struct {
|
||||
listener Listener
|
||||
httpFactory HTTPServerFactory
|
||||
clock clock.Clock
|
||||
|
||||
addr string
|
||||
port int32
|
||||
healthTimeout time.Duration
|
||||
recorder record.EventRecorder
|
||||
nodeRef *v1.ObjectReference
|
||||
|
||||
lastUpdated atomic.Value
|
||||
// SyncServices is part of ServiceHealthServer
|
||||
func (fake FakeServiceHealthServer) SyncServices(_ map[types.NamespacedName]uint16) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewDefaultHealthzServer returns a default healthz http server.
|
||||
func NewDefaultHealthzServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer {
|
||||
return newHealthzServer(nil, nil, nil, addr, healthTimeout, recorder, nodeRef)
|
||||
}
|
||||
|
||||
func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer {
|
||||
if listener == nil {
|
||||
listener = stdNetListener{}
|
||||
}
|
||||
if httpServerFactory == nil {
|
||||
httpServerFactory = stdHTTPServerFactory{}
|
||||
}
|
||||
if c == nil {
|
||||
c = clock.RealClock{}
|
||||
}
|
||||
return &HealthzServer{
|
||||
listener: listener,
|
||||
httpFactory: httpServerFactory,
|
||||
clock: c,
|
||||
addr: addr,
|
||||
healthTimeout: healthTimeout,
|
||||
recorder: recorder,
|
||||
nodeRef: nodeRef,
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateTimestamp updates the lastUpdated timestamp.
|
||||
func (hs *HealthzServer) UpdateTimestamp() {
|
||||
hs.lastUpdated.Store(hs.clock.Now())
|
||||
}
|
||||
|
||||
// Run starts the healthz http server and returns.
|
||||
func (hs *HealthzServer) Run() {
|
||||
serveMux := http.NewServeMux()
|
||||
serveMux.Handle("/healthz", healthzHandler{hs: hs})
|
||||
server := hs.httpFactory.New(hs.addr, serveMux)
|
||||
|
||||
go wait.Until(func() {
|
||||
klog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr)
|
||||
|
||||
listener, err := hs.listener.Listen(hs.addr)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Failed to start node healthz on %s: %v", hs.addr, err)
|
||||
if hs.recorder != nil {
|
||||
hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartNodeHealthcheck", msg)
|
||||
}
|
||||
klog.Error(msg)
|
||||
return
|
||||
}
|
||||
|
||||
if err := server.Serve(listener); err != nil {
|
||||
klog.Errorf("Healthz closed with error: %v", err)
|
||||
return
|
||||
}
|
||||
klog.Error("Unexpected healthz closed.")
|
||||
}, nodeHealthzRetryInterval, wait.NeverStop)
|
||||
}
|
||||
|
||||
type healthzHandler struct {
|
||||
hs *HealthzServer
|
||||
}
|
||||
|
||||
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
|
||||
lastUpdated := time.Time{}
|
||||
if val := h.hs.lastUpdated.Load(); val != nil {
|
||||
lastUpdated = val.(time.Time)
|
||||
}
|
||||
currentTime := h.hs.clock.Now()
|
||||
|
||||
resp.Header().Set("Content-Type", "application/json")
|
||||
resp.Header().Set("X-Content-Type-Options", "nosniff")
|
||||
if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) {
|
||||
resp.WriteHeader(http.StatusServiceUnavailable)
|
||||
} else {
|
||||
resp.WriteHeader(http.StatusOK)
|
||||
}
|
||||
fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime))
|
||||
// SyncEndpoints is part of ServiceHealthServer
|
||||
func (fake FakeServiceHealthServer) SyncEndpoints(_ map[types.NamespacedName]int) error {
|
||||
return nil
|
||||
}
|
@ -38,6 +38,7 @@ go_test(
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/healthcheck:go_default_library",
|
||||
"//pkg/proxy/util:go_default_library",
|
||||
"//pkg/proxy/util/testing:go_default_library",
|
||||
"//pkg/util/async:go_default_library",
|
||||
|
@ -200,8 +200,9 @@ type Proxier struct {
|
||||
nodeIP net.IP
|
||||
portMapper utilproxy.PortOpener
|
||||
recorder record.EventRecorder
|
||||
healthChecker healthcheck.Server
|
||||
healthzServer healthcheck.HealthzUpdater
|
||||
|
||||
serviceHealthServer healthcheck.ServiceHealthServer
|
||||
healthzServer healthcheck.ProxierHealthUpdater
|
||||
|
||||
// Since converting probabilities (floats) to strings is expensive
|
||||
// and we are using only probabilities in the format of 1/n, we are
|
||||
@ -257,7 +258,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
hostname string,
|
||||
nodeIP net.IP,
|
||||
recorder record.EventRecorder,
|
||||
healthzServer healthcheck.HealthzUpdater,
|
||||
healthzServer healthcheck.ProxierHealthUpdater,
|
||||
nodePortAddresses []string,
|
||||
) (*Proxier, error) {
|
||||
// Set the route_localnet sysctl we need for
|
||||
@ -291,7 +292,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
|
||||
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)
|
||||
|
||||
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
|
||||
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
|
||||
|
||||
isIPv6 := ipt.IsIpv6()
|
||||
proxier := &Proxier{
|
||||
@ -309,7 +310,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
nodeIP: nodeIP,
|
||||
portMapper: &listenPortOpener{},
|
||||
recorder: recorder,
|
||||
healthChecker: healthChecker,
|
||||
serviceHealthServer: serviceHealthServer,
|
||||
healthzServer: healthzServer,
|
||||
precomputedProbabilities: make([]string, 0, 1001),
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
@ -461,6 +462,9 @@ func (proxier *Proxier) probability(n int) string {
|
||||
|
||||
// Sync is called to synchronize the proxier state to iptables as soon as possible.
|
||||
func (proxier *Proxier) Sync() {
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.QueuedUpdate()
|
||||
}
|
||||
proxier.syncRunner.Run()
|
||||
}
|
||||
|
||||
@ -468,7 +472,7 @@ func (proxier *Proxier) Sync() {
|
||||
func (proxier *Proxier) SyncLoop() {
|
||||
// Update healthz timestamp at beginning in case Sync() never succeeds.
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
proxier.healthzServer.Updated()
|
||||
}
|
||||
proxier.syncRunner.Loop(wait.NeverStop)
|
||||
}
|
||||
@ -495,7 +499,7 @@ func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
|
||||
// service object is observed.
|
||||
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
|
||||
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
@ -1449,19 +1453,18 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
proxier.portsMap = replacementPortsMap
|
||||
|
||||
// Update healthz timestamp.
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
proxier.healthzServer.Updated()
|
||||
}
|
||||
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
|
||||
|
||||
// Update healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the healthChecker
|
||||
// Update service healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
klog.Errorf("Error syncing healthcheck services: %v", err)
|
||||
}
|
||||
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
klog.Errorf("Error syncing healthcheck endpoints: %v", err)
|
||||
}
|
||||
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||
utilproxytest "k8s.io/kubernetes/pkg/proxy/util/testing"
|
||||
"k8s.io/kubernetes/pkg/util/async"
|
||||
@ -342,28 +343,6 @@ func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Close
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type fakeHealthChecker struct {
|
||||
services map[types.NamespacedName]uint16
|
||||
endpoints map[types.NamespacedName]int
|
||||
}
|
||||
|
||||
func newFakeHealthChecker() *fakeHealthChecker {
|
||||
return &fakeHealthChecker{
|
||||
services: map[types.NamespacedName]uint16{},
|
||||
endpoints: map[types.NamespacedName]int{},
|
||||
}
|
||||
}
|
||||
|
||||
func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error {
|
||||
fake.services = newServices
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
|
||||
fake.endpoints = newEndpoints
|
||||
return nil
|
||||
}
|
||||
|
||||
const testHostname = "test-hostname"
|
||||
|
||||
func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Proxier {
|
||||
@ -380,7 +359,7 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro
|
||||
hostname: testHostname,
|
||||
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
||||
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
|
||||
healthChecker: newFakeHealthChecker(),
|
||||
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
|
||||
precomputedProbabilities: make([]string, 0, 1001),
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
existingFilterChainsData: bytes.NewBuffer(nil),
|
||||
|
@ -16,6 +16,7 @@ go_test(
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/healthcheck:go_default_library",
|
||||
"//pkg/proxy/ipvs/testing:go_default_library",
|
||||
"//pkg/proxy/util:go_default_library",
|
||||
"//pkg/proxy/util/testing:go_default_library",
|
||||
|
@ -227,9 +227,11 @@ type Proxier struct {
|
||||
nodeIP net.IP
|
||||
portMapper utilproxy.PortOpener
|
||||
recorder record.EventRecorder
|
||||
healthChecker healthcheck.Server
|
||||
healthzServer healthcheck.HealthzUpdater
|
||||
ipvsScheduler string
|
||||
|
||||
serviceHealthServer healthcheck.ServiceHealthServer
|
||||
healthzServer healthcheck.ProxierHealthUpdater
|
||||
|
||||
ipvsScheduler string
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
ipGetter IPGetter
|
||||
// The following buffers are used to reuse memory and avoid allocations
|
||||
@ -328,7 +330,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
hostname string,
|
||||
nodeIP net.IP,
|
||||
recorder record.EventRecorder,
|
||||
healthzServer healthcheck.HealthzUpdater,
|
||||
healthzServer healthcheck.ProxierHealthUpdater,
|
||||
scheduler string,
|
||||
nodePortAddresses []string,
|
||||
) (*Proxier, error) {
|
||||
@ -421,7 +423,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
scheduler = DefaultScheduler
|
||||
}
|
||||
|
||||
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
|
||||
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
|
||||
|
||||
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)
|
||||
|
||||
@ -443,7 +445,7 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
nodeIP: nodeIP,
|
||||
portMapper: &listenPortOpener{},
|
||||
recorder: recorder,
|
||||
healthChecker: healthChecker,
|
||||
serviceHealthServer: serviceHealthServer,
|
||||
healthzServer: healthzServer,
|
||||
ipvs: ipvs,
|
||||
ipvsScheduler: scheduler,
|
||||
@ -489,7 +491,7 @@ func NewDualStackProxier(
|
||||
hostname string,
|
||||
nodeIP [2]net.IP,
|
||||
recorder record.EventRecorder,
|
||||
healthzServer healthcheck.HealthzUpdater,
|
||||
healthzServer healthcheck.ProxierHealthUpdater,
|
||||
scheduler string,
|
||||
nodePortAddresses []string,
|
||||
) (proxy.Provider, error) {
|
||||
@ -775,6 +777,9 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset
|
||||
|
||||
// Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible.
|
||||
func (proxier *Proxier) Sync() {
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.QueuedUpdate()
|
||||
}
|
||||
proxier.syncRunner.Run()
|
||||
}
|
||||
|
||||
@ -782,7 +787,7 @@ func (proxier *Proxier) Sync() {
|
||||
func (proxier *Proxier) SyncLoop() {
|
||||
// Update healthz timestamp at beginning in case Sync() never succeeds.
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
proxier.healthzServer.Updated()
|
||||
}
|
||||
proxier.syncRunner.Loop(wait.NeverStop)
|
||||
}
|
||||
@ -807,7 +812,7 @@ func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
|
||||
// OnServiceUpdate is called whenever modification of an existing service object is observed.
|
||||
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
|
||||
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
@ -839,7 +844,7 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
||||
// OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
|
||||
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
|
||||
if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
@ -1489,19 +1494,18 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
|
||||
|
||||
// Update healthz timestamp
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
proxier.healthzServer.Updated()
|
||||
}
|
||||
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
|
||||
|
||||
// Update healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the healthChecker
|
||||
// Update service healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
klog.Errorf("Error syncing healthcheck services: %v", err)
|
||||
}
|
||||
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
klog.Errorf("Error syncing healthcheck endpoints: %v", err)
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
|
||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
|
||||
@ -59,18 +60,6 @@ func (f *fakeIPGetter) NodeIPs() ([]net.IP, error) {
|
||||
return f.nodeIPs, nil
|
||||
}
|
||||
|
||||
type fakeHealthChecker struct {
|
||||
services map[types.NamespacedName]uint16
|
||||
Endpoints map[types.NamespacedName]int
|
||||
}
|
||||
|
||||
func newFakeHealthChecker() *fakeHealthChecker {
|
||||
return &fakeHealthChecker{
|
||||
services: map[types.NamespacedName]uint16{},
|
||||
Endpoints: map[types.NamespacedName]int{},
|
||||
}
|
||||
}
|
||||
|
||||
// fakePortOpener implements portOpener.
|
||||
type fakePortOpener struct {
|
||||
openPorts []*utilproxy.LocalPort
|
||||
@ -83,16 +72,6 @@ func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Close
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error {
|
||||
fake.services = newServices
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
|
||||
fake.Endpoints = newEndpoints
|
||||
return nil
|
||||
}
|
||||
|
||||
// fakeKernelHandler implements KernelHandler.
|
||||
type fakeKernelHandler struct {
|
||||
modules []string
|
||||
@ -151,7 +130,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
|
||||
hostname: testHostname,
|
||||
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
||||
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
|
||||
healthChecker: newFakeHealthChecker(),
|
||||
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
|
||||
ipvsScheduler: DefaultScheduler,
|
||||
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
|
@ -62,6 +62,7 @@ go_test(
|
||||
deps = select({
|
||||
"@io_bazel_rules_go//go/platform:windows": [
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/healthcheck:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
|
@ -471,8 +471,9 @@ type Proxier struct {
|
||||
hostname string
|
||||
nodeIP net.IP
|
||||
recorder record.EventRecorder
|
||||
healthChecker healthcheck.Server
|
||||
healthzServer healthcheck.HealthzUpdater
|
||||
|
||||
serviceHealthServer healthcheck.ServiceHealthServer
|
||||
healthzServer healthcheck.ProxierHealthUpdater
|
||||
|
||||
// Since converting probabilities (floats) to strings is expensive
|
||||
// and we are using only probabilities in the format of 1/n, we are
|
||||
@ -527,7 +528,7 @@ func NewProxier(
|
||||
hostname string,
|
||||
nodeIP net.IP,
|
||||
recorder record.EventRecorder,
|
||||
healthzServer healthcheck.HealthzUpdater,
|
||||
healthzServer healthcheck.ProxierHealthUpdater,
|
||||
config config.KubeProxyWinkernelConfiguration,
|
||||
) (*Proxier, error) {
|
||||
masqueradeValue := 1 << uint(masqueradeBit)
|
||||
@ -542,7 +543,7 @@ func NewProxier(
|
||||
klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
|
||||
}
|
||||
|
||||
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
|
||||
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
|
||||
var hns HostNetworkService
|
||||
hns = hnsV1{}
|
||||
supportedFeatures := hcn.GetSupportedFeatures()
|
||||
@ -622,24 +623,24 @@ func NewProxier(
|
||||
}
|
||||
|
||||
proxier := &Proxier{
|
||||
portsMap: make(map[localPort]closeable),
|
||||
serviceMap: make(proxyServiceMap),
|
||||
serviceChanges: newServiceChangeMap(),
|
||||
endpointsMap: make(proxyEndpointsMap),
|
||||
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||
masqueradeAll: masqueradeAll,
|
||||
masqueradeMark: masqueradeMark,
|
||||
clusterCIDR: clusterCIDR,
|
||||
hostname: hostname,
|
||||
nodeIP: nodeIP,
|
||||
recorder: recorder,
|
||||
healthChecker: healthChecker,
|
||||
healthzServer: healthzServer,
|
||||
hns: hns,
|
||||
network: *hnsNetworkInfo,
|
||||
sourceVip: sourceVip,
|
||||
hostMac: hostMac,
|
||||
isDSR: isDSR,
|
||||
portsMap: make(map[localPort]closeable),
|
||||
serviceMap: make(proxyServiceMap),
|
||||
serviceChanges: newServiceChangeMap(),
|
||||
endpointsMap: make(proxyEndpointsMap),
|
||||
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||
masqueradeAll: masqueradeAll,
|
||||
masqueradeMark: masqueradeMark,
|
||||
clusterCIDR: clusterCIDR,
|
||||
hostname: hostname,
|
||||
nodeIP: nodeIP,
|
||||
recorder: recorder,
|
||||
serviceHealthServer: serviceHealthServer,
|
||||
healthzServer: healthzServer,
|
||||
hns: hns,
|
||||
network: *hnsNetworkInfo,
|
||||
sourceVip: sourceVip,
|
||||
hostMac: hostMac,
|
||||
isDSR: isDSR,
|
||||
}
|
||||
|
||||
burstSyncs := 2
|
||||
@ -725,6 +726,9 @@ func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) {
|
||||
|
||||
// Sync is called to synchronize the proxier state to hns as soon as possible.
|
||||
func (proxier *Proxier) Sync() {
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.QueuedUpdate()
|
||||
}
|
||||
proxier.syncRunner.Run()
|
||||
}
|
||||
|
||||
@ -732,7 +736,7 @@ func (proxier *Proxier) Sync() {
|
||||
func (proxier *Proxier) SyncLoop() {
|
||||
// Update healthz timestamp at beginning in case Sync() never succeeds.
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
proxier.healthzServer.Updated()
|
||||
}
|
||||
proxier.syncRunner.Loop(wait.NeverStop)
|
||||
}
|
||||
@ -752,21 +756,21 @@ func (proxier *Proxier) isInitialized() bool {
|
||||
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
if proxier.serviceChanges.update(&namespacedName, nil, service, proxier.hns) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
if proxier.serviceChanges.update(&namespacedName, oldService, service, proxier.hns) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
if proxier.serviceChanges.update(&namespacedName, service, nil, proxier.hns) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
@ -827,21 +831,21 @@ func (proxier *Proxier) updateServiceMap() (result updateServiceMapResult) {
|
||||
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||
if proxier.endpointsChanges.update(&namespacedName, nil, endpoints, proxier.hns) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
|
||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||
if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints, proxier.hns) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
|
||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||
if proxier.endpointsChanges.update(&namespacedName, endpoints, nil, proxier.hns) && proxier.isInitialized() {
|
||||
proxier.syncRunner.Run()
|
||||
proxier.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
@ -1276,19 +1280,18 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
Log(svcInfo, "+++Policy Successfully applied for service +++", 2)
|
||||
}
|
||||
|
||||
// Update healthz timestamp.
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
proxier.healthzServer.Updated()
|
||||
}
|
||||
SyncProxyRulesLastTimestamp.SetToCurrentTime()
|
||||
|
||||
// Update healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the healthChecker
|
||||
// Update service healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.hcServices); err != nil {
|
||||
klog.Errorf("Error syncing healthcheck services: %v", err)
|
||||
}
|
||||
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
|
||||
klog.Errorf("Error syncing healthcheck endpoints: %v", err)
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
discovery "k8s.io/api/discovery/v1alpha1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
|
||||
"net"
|
||||
"strings"
|
||||
@ -39,27 +40,6 @@ const destinationPrefix = "192.168.2.0/24"
|
||||
const providerAddress = "10.0.0.3"
|
||||
const guid = "123ABC"
|
||||
|
||||
type fakeHealthChecker struct {
|
||||
services map[types.NamespacedName]uint16
|
||||
endpoints map[types.NamespacedName]int
|
||||
}
|
||||
|
||||
func newFakeHealthChecker() *fakeHealthChecker {
|
||||
return &fakeHealthChecker{
|
||||
services: map[types.NamespacedName]uint16{},
|
||||
endpoints: map[types.NamespacedName]int{},
|
||||
}
|
||||
}
|
||||
func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error {
|
||||
fake.services = newServices
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
|
||||
fake.endpoints = newEndpoints
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakeHNS struct{}
|
||||
|
||||
func newFakeHNS() *fakeHNS {
|
||||
@ -126,20 +106,20 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
|
||||
networkType: networkType,
|
||||
}
|
||||
proxier := &Proxier{
|
||||
portsMap: make(map[localPort]closeable),
|
||||
serviceMap: make(proxyServiceMap),
|
||||
serviceChanges: newServiceChangeMap(),
|
||||
endpointsMap: make(proxyEndpointsMap),
|
||||
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||
clusterCIDR: clusterCIDR,
|
||||
hostname: testHostName,
|
||||
nodeIP: nodeIP,
|
||||
healthChecker: newFakeHealthChecker(),
|
||||
network: *hnsNetworkInfo,
|
||||
sourceVip: sourceVip,
|
||||
hostMac: macAddress,
|
||||
isDSR: false,
|
||||
hns: newFakeHNS(),
|
||||
portsMap: make(map[localPort]closeable),
|
||||
serviceMap: make(proxyServiceMap),
|
||||
serviceChanges: newServiceChangeMap(),
|
||||
endpointsMap: make(proxyEndpointsMap),
|
||||
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||
clusterCIDR: clusterCIDR,
|
||||
hostname: testHostName,
|
||||
nodeIP: nodeIP,
|
||||
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
|
||||
network: *hnsNetworkInfo,
|
||||
sourceVip: sourceVip,
|
||||
hostMac: macAddress,
|
||||
isDSR: false,
|
||||
hns: newFakeHNS(),
|
||||
}
|
||||
return proxier
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user