Merge pull request #118146 from aroradaman/fix/proxy-healthzserver

proxy healthz server for dualstack clusters
This commit is contained in:
Kubernetes Prow Robot 2023-10-16 21:19:25 +02:00 committed by GitHub
commit b5ba899dfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 217 additions and 174 deletions

View File

@ -578,7 +578,7 @@ type ProxyServer struct {
Broadcaster events.EventBroadcaster Broadcaster events.EventBroadcaster
Recorder events.EventRecorder Recorder events.EventRecorder
NodeRef *v1.ObjectReference NodeRef *v1.ObjectReference
HealthzServer healthcheck.ProxierHealthUpdater HealthzServer *healthcheck.ProxierHealthServer
Hostname string Hostname string
PrimaryIPFamily v1.IPFamily PrimaryIPFamily v1.IPFamily
NodeIPs map[v1.IPFamily]net.IP NodeIPs map[v1.IPFamily]net.IP
@ -626,7 +626,7 @@ func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master strin
} }
if len(config.HealthzBindAddress) > 0 { if len(config.HealthzBindAddress) > 0 {
s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, s.Recorder, s.NodeRef) s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration)
} }
err = s.platformSetup() err = s.platformSetup()
@ -787,7 +787,7 @@ func createClient(config componentbaseconfig.ClientConnectionConfiguration, mast
return client, nil return client, nil
} }
func serveHealthz(hz healthcheck.ProxierHealthUpdater, errCh chan error) { func serveHealthz(hz *healthcheck.ProxierHealthServer, errCh chan error) {
if hz == nil { if hz == nil {
return return
} }
@ -873,16 +873,17 @@ func (s *ProxyServer) Run() error {
// TODO(thockin): make it possible for healthz and metrics to be on the same port. // TODO(thockin): make it possible for healthz and metrics to be on the same port.
var errCh chan error var healthzErrCh, metricsErrCh chan error
if s.Config.BindAddressHardFail { if s.Config.BindAddressHardFail {
errCh = make(chan error) healthzErrCh = make(chan error)
metricsErrCh = make(chan error)
} }
// Start up a healthz server if requested // Start up a healthz server if requested
serveHealthz(s.HealthzServer, errCh) serveHealthz(s.HealthzServer, healthzErrCh)
// Start up a metrics server if requested // Start up a metrics server if requested
serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, errCh) serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil) noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
if err != nil { if err != nil {
@ -947,7 +948,13 @@ func (s *ProxyServer) Run() error {
go s.Proxier.SyncLoop() go s.Proxier.SyncLoop()
return <-errCh select {
case err = <-healthzErrCh:
s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, "FailedToStartProxierHealthcheck", "StartKubeProxy", err.Error())
case err = <-metricsErrCh:
s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, "FailedToStartMetricServer", "StartKubeProxy", err.Error())
}
return err
} }
func (s *ProxyServer) birthCry() { func (s *ProxyServer) birthCry() {

View File

@ -103,6 +103,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
) )
} else { } else {
proxier, err = winkernel.NewProxier( proxier, err = winkernel.NewProxier(
s.PrimaryIPFamily,
config.IPTables.SyncPeriod.Duration, config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration,
config.ClusterCIDR, config.ClusterCIDR,

View File

@ -470,7 +470,7 @@ func TestHealthzServer(t *testing.T) {
httpFactory := newFakeHTTPServerFactory() httpFactory := newFakeHTTPServerFactory()
fakeClock := testingclock.NewFakeClock(time.Now()) fakeClock := testingclock.NewFakeClock(time.Now())
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil) hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs}) server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
hsTest := &serverTest{ hsTest := &serverTest{
@ -480,26 +480,7 @@ func TestHealthzServer(t *testing.T) {
tracking503: 0, tracking503: 0,
} }
// Should return 200 "OK" by default. testProxierHealthUpdater(hs, hsTest, fakeClock, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" after first update
hs.Updated()
testHTTPHandler(hsTest, http.StatusOK, t)
// Should continue to return 200 "OK" as long as no further updates are queued
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
hs.QueuedUpdate()
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update
hs.Updated()
fakeClock.Step(5 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" if we've synced a node, tainted in any other way // Should return 200 "OK" if we've synced a node, tainted in any other way
hs.SyncNode(makeNode(tweakTainted("other"))) hs.SyncNode(makeNode(tweakTainted("other")))
@ -524,7 +505,7 @@ func TestLivezServer(t *testing.T) {
httpFactory := newFakeHTTPServerFactory() httpFactory := newFakeHTTPServerFactory()
fakeClock := testingclock.NewFakeClock(time.Now()) fakeClock := testingclock.NewFakeClock(time.Now())
hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil) hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs}) server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs})
hsTest := &serverTest{ hsTest := &serverTest{
@ -534,26 +515,7 @@ func TestLivezServer(t *testing.T) {
tracking503: 0, tracking503: 0,
} }
// Should return 200 "OK" by default. testProxierHealthUpdater(hs, hsTest, fakeClock, t)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" after first update
hs.Updated()
testHTTPHandler(hsTest, http.StatusOK, t)
// Should continue to return 200 "OK" as long as no further updates are queued
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if exceed max update-processing time
hs.QueuedUpdate()
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update
hs.Updated()
fakeClock.Step(5 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" irrespective of node syncs // Should return 200 "OK" irrespective of node syncs
hs.SyncNode(makeNode(tweakTainted("other"))) hs.SyncNode(makeNode(tweakTainted("other")))
@ -579,6 +541,77 @@ var (
livezURL url = "/livez" livezURL url = "/livez"
) )
func testProxierHealthUpdater(hs *ProxierHealthServer, hsTest *serverTest, fakeClock *testingclock.FakeClock, t *testing.T) {
// Should return 200 "OK" by default.
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 200 "OK" after first update for both IPv4 and IPv6 proxiers.
hs.Updated(v1.IPv4Protocol)
hs.Updated(v1.IPv6Protocol)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should continue to return 200 "OK" as long as no further updates are queued for any proxier.
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if IPv4 proxier exceed max update-processing time.
hs.QueuedUpdate(v1.IPv4Protocol)
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers.
hs.Updated(v1.IPv4Protocol)
hs.Updated(v1.IPv6Protocol)
fakeClock.Step(5 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if IPv6 proxier exceed max update-processing time.
hs.QueuedUpdate(v1.IPv6Protocol)
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers.
hs.Updated(v1.IPv4Protocol)
hs.Updated(v1.IPv6Protocol)
fakeClock.Step(5 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// Should return 503 "ServiceUnavailable" if both IPv4 and IPv6 proxiers exceed max update-processing time.
hs.QueuedUpdate(v1.IPv4Protocol)
hs.QueuedUpdate(v1.IPv6Protocol)
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
// Should return 200 "OK" after processing update for both IPv4 and IPv6 proxiers.
hs.Updated(v1.IPv4Protocol)
hs.Updated(v1.IPv6Protocol)
fakeClock.Step(5 * time.Second)
testHTTPHandler(hsTest, http.StatusOK, t)
// If IPv6 proxier is late for an update but IPv4 proxier is not then updating IPv4 proxier should have no effect.
hs.QueuedUpdate(v1.IPv6Protocol)
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
hs.Updated(v1.IPv4Protocol)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
hs.Updated(v1.IPv6Protocol)
testHTTPHandler(hsTest, http.StatusOK, t)
// If both IPv4 and IPv6 proxiers are late for an update, we shouldn't report 200 "OK" until after both of them update.
hs.QueuedUpdate(v1.IPv4Protocol)
hs.QueuedUpdate(v1.IPv6Protocol)
fakeClock.Step(25 * time.Second)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
hs.Updated(v1.IPv4Protocol)
testHTTPHandler(hsTest, http.StatusServiceUnavailable, t)
hs.Updated(v1.IPv6Protocol)
testHTTPHandler(hsTest, http.StatusOK, t)
}
func testHTTPHandler(hsTest *serverTest, status int, t *testing.T) { func testHTTPHandler(hsTest *serverTest, status int, t *testing.T) {
handler := hsTest.server.(*fakeHTTPServer).handler handler := hsTest.server.(*fakeHTTPServer).handler
req, err := http.NewRequest("GET", string(hsTest.url), nil) req, err := http.NewRequest("GET", string(hsTest.url), nil)

View File

@ -19,13 +19,11 @@ package healthcheck
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"sync/atomic" "sync"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2" "k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/metrics"
"k8s.io/utils/clock" "k8s.io/utils/clock"
) )
@ -36,129 +34,135 @@ const (
ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler"
) )
// ProxierHealthUpdater allows callers to update healthz timestamp only. // ProxierHealthServer allows callers to:
type ProxierHealthUpdater interface { // 1. run a http server with /healthz and /livez endpoint handlers.
// QueuedUpdate should be called when the proxier receives a Service or Endpoints // 2. update healthz timestamps before and after synchronizing dataplane.
// event containing information that requires updating service rules. // 3. sync node status, for reporting unhealthy /healthz response
QueuedUpdate() // if the node is marked for deletion by autoscaler.
// 4. get proxy health by verifying that the delay between QueuedUpdate()
// Updated should be called when the proxier has successfully updated the service // calls and Updated() calls exceeded healthTimeout or not.
// rules to reflect the current state. type ProxierHealthServer struct {
Updated()
// Run starts the healthz HTTP server and blocks until it exits.
Run() error
// Sync the node and determine if its eligible or not. Eligible is
// defined as being: not tainted by ToBeDeletedTaint and not deleted.
SyncNode(node *v1.Node)
proxierHealthChecker
}
var _ ProxierHealthUpdater = &proxierHealthServer{}
var zeroTime = time.Time{}
// proxierHealthServer returns 200 "OK" by default. It verifies that the delay between
// QueuedUpdate() calls and Updated() calls never exceeds healthTimeout.
type proxierHealthServer struct {
listener listener listener listener
httpFactory httpServerFactory httpFactory httpServerFactory
clock clock.Clock clock clock.Clock
addr string addr string
healthTimeout time.Duration healthTimeout time.Duration
recorder events.EventRecorder
nodeRef *v1.ObjectReference
lastUpdated atomic.Value lock sync.RWMutex
oldestPendingQueued atomic.Value lastUpdatedMap map[v1.IPFamily]time.Time
nodeEligible atomic.Bool oldestPendingQueuedMap map[v1.IPFamily]time.Time
nodeEligible bool
} }
// NewProxierHealthServer returns a proxier health http server. // NewProxierHealthServer returns a proxier health http server.
func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) ProxierHealthUpdater { func NewProxierHealthServer(addr string, healthTimeout time.Duration) *ProxierHealthServer {
return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout, recorder, nodeRef) return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout)
} }
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) *proxierHealthServer { func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *ProxierHealthServer {
hs := &proxierHealthServer{ return &ProxierHealthServer{
listener: listener, listener: listener,
httpFactory: httpServerFactory, httpFactory: httpServerFactory,
clock: c, clock: c,
addr: addr, addr: addr,
healthTimeout: healthTimeout, healthTimeout: healthTimeout,
recorder: recorder,
nodeRef: nodeRef, lastUpdatedMap: make(map[v1.IPFamily]time.Time),
} oldestPendingQueuedMap: make(map[v1.IPFamily]time.Time),
// The node is eligible (and thus the proxy healthy) while it's starting up // The node is eligible (and thus the proxy healthy) while it's starting up
// and until we've processed the first node event that indicates the // and until we've processed the first node event that indicates the
// contrary. // contrary.
hs.nodeEligible.Store(true) nodeEligible: true,
return hs }
} }
// Updated indicates that kube-proxy has successfully updated its backend, so it should // Updated should be called when the proxier of the given IP family has successfully updated
// be considered healthy now. // the service rules to reflect the current state and should be considered healthy now.
func (hs *proxierHealthServer) Updated() { func (hs *ProxierHealthServer) Updated(ipFamily v1.IPFamily) {
hs.oldestPendingQueued.Store(zeroTime) hs.lock.Lock()
hs.lastUpdated.Store(hs.clock.Now()) defer hs.lock.Unlock()
delete(hs.oldestPendingQueuedMap, ipFamily)
hs.lastUpdatedMap[ipFamily] = hs.clock.Now()
} }
// QueuedUpdate indicates that the proxy has received changes from the apiserver but // QueuedUpdate should be called when the proxier receives a Service or Endpoints event
// has not yet pushed them to its backend. If the proxy does not call Updated within the // from API Server containing information that requires updating service rules. It
// indicates that the proxier for the given IP family has received changes but has not
// yet pushed them to its backend. If the proxier does not call Updated within the
// healthTimeout time then it will be considered unhealthy. // healthTimeout time then it will be considered unhealthy.
func (hs *proxierHealthServer) QueuedUpdate() { func (hs *ProxierHealthServer) QueuedUpdate(ipFamily v1.IPFamily) {
// Set oldestPendingQueued only if it's currently zero hs.lock.Lock()
hs.oldestPendingQueued.CompareAndSwap(zeroTime, hs.clock.Now()) defer hs.lock.Unlock()
// Set oldestPendingQueuedMap[ipFamily] only if it's currently unset
if _, set := hs.oldestPendingQueuedMap[ipFamily]; !set {
hs.oldestPendingQueuedMap[ipFamily] = hs.clock.Now()
}
} }
// IsHealthy returns only the proxier's health state, following the same // IsHealthy returns only the proxier's health state, following the same
// definition the HTTP server defines, but ignoring the state of the Node. // definition the HTTP server defines, but ignoring the state of the Node.
func (hs *proxierHealthServer) IsHealthy() bool { func (hs *ProxierHealthServer) IsHealthy() bool {
isHealthy, _, _ := hs.isHealthy() isHealthy, _ := hs.isHealthy()
return isHealthy return isHealthy
} }
func (hs *proxierHealthServer) isHealthy() (bool, time.Time, time.Time) { func (hs *ProxierHealthServer) isHealthy() (bool, time.Time) {
var oldestPendingQueued, lastUpdated time.Time hs.lock.RLock()
if val := hs.oldestPendingQueued.Load(); val != nil { defer hs.lock.RUnlock()
oldestPendingQueued = val.(time.Time)
} var lastUpdated time.Time
if val := hs.lastUpdated.Load(); val != nil {
lastUpdated = val.(time.Time)
}
currentTime := hs.clock.Now() currentTime := hs.clock.Now()
healthy := false for ipFamily, proxierLastUpdated := range hs.lastUpdatedMap {
switch {
case oldestPendingQueued.IsZero(): if proxierLastUpdated.After(lastUpdated) {
// The proxy is healthy while it's starting up lastUpdated = proxierLastUpdated
// or the proxy is fully synced.
healthy = true
case currentTime.Sub(oldestPendingQueued) < hs.healthTimeout:
// There's an unprocessed update queued, but it's not late yet
healthy = true
} }
return healthy, lastUpdated, currentTime
if _, set := hs.oldestPendingQueuedMap[ipFamily]; !set {
// the proxier is healthy while it's starting up
// or the proxier is fully synced.
continue
}
if currentTime.Sub(hs.oldestPendingQueuedMap[ipFamily]) < hs.healthTimeout {
// there's an unprocessed update queued for this proxier, but it's not late yet.
continue
}
return false, proxierLastUpdated
}
return true, lastUpdated
} }
func (hs *proxierHealthServer) SyncNode(node *v1.Node) { // SyncNode syncs the node and determines if it is eligible or not. Eligible is
// defined as being: not tainted by ToBeDeletedTaint and not deleted.
func (hs *ProxierHealthServer) SyncNode(node *v1.Node) {
hs.lock.Lock()
defer hs.lock.Unlock()
if !node.DeletionTimestamp.IsZero() { if !node.DeletionTimestamp.IsZero() {
hs.nodeEligible.Store(false) hs.nodeEligible = false
return return
} }
for _, taint := range node.Spec.Taints { for _, taint := range node.Spec.Taints {
if taint.Key == ToBeDeletedTaint { if taint.Key == ToBeDeletedTaint {
hs.nodeEligible.Store(false) hs.nodeEligible = false
return return
} }
} }
hs.nodeEligible.Store(true) hs.nodeEligible = true
}
// NodeEligible returns nodeEligible field of ProxierHealthServer.
func (hs *ProxierHealthServer) NodeEligible() bool {
hs.lock.RLock()
defer hs.lock.RUnlock()
return hs.nodeEligible
} }
// Run starts the healthz HTTP server and blocks until it exits. // Run starts the healthz HTTP server and blocks until it exits.
func (hs *proxierHealthServer) Run() error { func (hs *ProxierHealthServer) Run() error {
serveMux := http.NewServeMux() serveMux := http.NewServeMux()
serveMux.Handle("/healthz", healthzHandler{hs: hs}) serveMux.Handle("/healthz", healthzHandler{hs: hs})
serveMux.Handle("/livez", livezHandler{hs: hs}) serveMux.Handle("/livez", livezHandler{hs: hs})
@ -166,12 +170,7 @@ func (hs *proxierHealthServer) Run() error {
listener, err := hs.listener.Listen(hs.addr) listener, err := hs.listener.Listen(hs.addr)
if err != nil { if err != nil {
msg := fmt.Sprintf("failed to start proxier healthz on %s: %v", hs.addr, err) return fmt.Errorf("failed to start proxier healthz on %s: %v", hs.addr, err)
// TODO(thockin): move eventing back to caller
if hs.recorder != nil {
hs.recorder.Eventf(hs.nodeRef, nil, api.EventTypeWarning, "FailedToStartProxierHealthcheck", "StartKubeProxy", msg)
}
return fmt.Errorf("%v", msg)
} }
klog.V(3).InfoS("Starting healthz HTTP server", "address", hs.addr) klog.V(3).InfoS("Starting healthz HTTP server", "address", hs.addr)
@ -183,12 +182,14 @@ func (hs *proxierHealthServer) Run() error {
} }
type healthzHandler struct { type healthzHandler struct {
hs *proxierHealthServer hs *ProxierHealthServer
} }
func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
nodeEligible := h.hs.nodeEligible.Load() nodeEligible := h.hs.NodeEligible()
healthy, lastUpdated, currentTime := h.hs.isHealthy() healthy, lastUpdated := h.hs.isHealthy()
currentTime := h.hs.clock.Now()
healthy = healthy && nodeEligible healthy = healthy && nodeEligible
resp.Header().Set("Content-Type", "application/json") resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("X-Content-Type-Options", "nosniff") resp.Header().Set("X-Content-Type-Options", "nosniff")
@ -209,11 +210,12 @@ func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
} }
type livezHandler struct { type livezHandler struct {
hs *proxierHealthServer hs *ProxierHealthServer
} }
func (h livezHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { func (h livezHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
healthy, lastUpdated, currentTime := h.hs.isHealthy() healthy, lastUpdated := h.hs.isHealthy()
currentTime := h.hs.clock.Now()
resp.Header().Set("Content-Type", "application/json") resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("X-Content-Type-Options", "nosniff") resp.Header().Set("X-Content-Type-Options", "nosniff")
if !healthy { if !healthy {

View File

@ -139,6 +139,9 @@ func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.Servic
// Proxier is an iptables based proxy for connections between a localhost:lport // Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends. // and services that provide the actual backends.
type Proxier struct { type Proxier struct {
// ipFamily defines the IP family which this proxier is tracking.
ipFamily v1.IPFamily
// endpointsChanges and serviceChanges contains all changes to endpoints and // endpointsChanges and serviceChanges contains all changes to endpoints and
// services that happened since iptables was synced. For a single object, // services that happened since iptables was synced. For a single object,
// changes are accumulated, i.e. previous is state from before all of them, // changes are accumulated, i.e. previous is state from before all of them,
@ -172,7 +175,7 @@ type Proxier struct {
recorder events.EventRecorder recorder events.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer serviceHealthServer healthcheck.ServiceHealthServer
healthzServer healthcheck.ProxierHealthUpdater healthzServer *healthcheck.ProxierHealthServer
// Since converting probabilities (floats) to strings is expensive // Since converting probabilities (floats) to strings is expensive
// and we are using only probabilities in the format of 1/n, we are // and we are using only probabilities in the format of 1/n, we are
@ -228,7 +231,7 @@ func NewProxier(ipFamily v1.IPFamily,
hostname string, hostname string,
nodeIP net.IP, nodeIP net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater, healthzServer *healthcheck.ProxierHealthServer,
nodePortAddressStrings []string, nodePortAddressStrings []string,
) (*Proxier, error) { ) (*Proxier, error) {
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings) nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings)
@ -262,6 +265,7 @@ func NewProxier(ipFamily v1.IPFamily,
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
proxier := &Proxier{ proxier := &Proxier{
ipFamily: ipFamily,
svcPortMap: make(proxy.ServicePortMap), svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
@ -324,7 +328,7 @@ func NewDualStackProxier(
hostname string, hostname string,
nodeIPs map[v1.IPFamily]net.IP, nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater, healthzServer *healthcheck.ProxierHealthServer,
nodePortAddresses []string, nodePortAddresses []string,
) (proxy.Provider, error) { ) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier // Create an ipv4 instance of the single-stack proxier
@ -486,7 +490,7 @@ func (proxier *Proxier) probability(n int) string {
// Sync is called to synchronize the proxier state to iptables as soon as possible. // Sync is called to synchronize the proxier state to iptables as soon as possible.
func (proxier *Proxier) Sync() { func (proxier *Proxier) Sync() {
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.QueuedUpdate() proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
} }
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
proxier.syncRunner.Run() proxier.syncRunner.Run()
@ -496,7 +500,7 @@ func (proxier *Proxier) Sync() {
func (proxier *Proxier) SyncLoop() { func (proxier *Proxier) SyncLoop() {
// Update healthz timestamp at beginning in case Sync() never succeeds. // Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated(proxier.ipFamily)
} }
// synthesize "last change queued" time as the informers are syncing. // synthesize "last change queued" time as the informers are syncing.
@ -1534,7 +1538,7 @@ func (proxier *Proxier) syncProxyRules() {
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal)) metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal))
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal)) metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal))
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated(proxier.ipFamily)
} }
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()

View File

@ -267,7 +267,7 @@ type Proxier struct {
recorder events.EventRecorder recorder events.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer serviceHealthServer healthcheck.ServiceHealthServer
healthzServer healthcheck.ProxierHealthUpdater healthzServer *healthcheck.ProxierHealthServer
ipvsScheduler string ipvsScheduler string
// The following buffers are used to reuse memory and avoid allocations // The following buffers are used to reuse memory and avoid allocations
@ -336,7 +336,7 @@ func NewProxier(ipFamily v1.IPFamily,
hostname string, hostname string,
nodeIP net.IP, nodeIP net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater, healthzServer *healthcheck.ProxierHealthServer,
scheduler string, scheduler string,
nodePortAddressStrings []string, nodePortAddressStrings []string,
kernelHandler KernelHandler, kernelHandler KernelHandler,
@ -486,7 +486,7 @@ func NewDualStackProxier(
hostname string, hostname string,
nodeIPs map[v1.IPFamily]net.IP, nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater, healthzServer *healthcheck.ProxierHealthServer,
scheduler string, scheduler string,
nodePortAddresses []string, nodePortAddresses []string,
kernelHandler KernelHandler, kernelHandler KernelHandler,
@ -760,7 +760,7 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset
// Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible. // Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible.
func (proxier *Proxier) Sync() { func (proxier *Proxier) Sync() {
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.QueuedUpdate() proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
} }
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
proxier.syncRunner.Run() proxier.syncRunner.Run()
@ -770,7 +770,7 @@ func (proxier *Proxier) Sync() {
func (proxier *Proxier) SyncLoop() { func (proxier *Proxier) SyncLoop() {
// Update healthz timestamp at beginning in case Sync() never succeeds. // Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated(proxier.ipFamily)
} }
// synthesize "last change queued" time as the informers are syncing. // synthesize "last change queued" time as the informers are syncing.
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
@ -1493,7 +1493,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices) proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated(proxier.ipFamily)
} }
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()

View File

@ -90,7 +90,7 @@ func (n *NodePodCIDRHandler) OnNodeSynced() {}
// NodeEligibleHandler handles the life cycle of the Node's eligibility, as // NodeEligibleHandler handles the life cycle of the Node's eligibility, as
// determined by the health server for directing load balancer traffic. // determined by the health server for directing load balancer traffic.
type NodeEligibleHandler struct { type NodeEligibleHandler struct {
HealthServer healthcheck.ProxierHealthUpdater HealthServer *healthcheck.ProxierHealthServer
} }
var _ config.NodeHandler = &NodeEligibleHandler{} var _ config.NodeHandler = &NodeEligibleHandler{}

View File

@ -594,6 +594,8 @@ type endPointsReferenceCountMap map[string]*uint16
// Proxier is an hns based proxy for connections between a localhost:lport // Proxier is an hns based proxy for connections between a localhost:lport
// and services that provide the actual backends. // and services that provide the actual backends.
type Proxier struct { type Proxier struct {
// ipFamily defines the IP family which this proxier is tracking.
ipFamily v1.IPFamily
// TODO(imroc): implement node handler for winkernel proxier. // TODO(imroc): implement node handler for winkernel proxier.
proxyconfig.NoopNodeHandler proxyconfig.NoopNodeHandler
@ -612,7 +614,6 @@ type Proxier struct {
// with some partial data after kube-proxy restart. // with some partial data after kube-proxy restart.
endpointSlicesSynced bool endpointSlicesSynced bool
servicesSynced bool servicesSynced bool
isIPv6Mode bool
initialized int32 initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
// These are effectively const and do not need the mutex to be held. // These are effectively const and do not need the mutex to be held.
@ -622,7 +623,7 @@ type Proxier struct {
recorder events.EventRecorder recorder events.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer serviceHealthServer healthcheck.ServiceHealthServer
healthzServer healthcheck.ProxierHealthUpdater healthzServer *healthcheck.ProxierHealthServer
hns HostNetworkService hns HostNetworkService
hcn HcnService hcn HcnService
@ -671,13 +672,14 @@ var _ proxy.Provider = &Proxier{}
// NewProxier returns a new Proxier // NewProxier returns a new Proxier
func NewProxier( func NewProxier(
ipFamily v1.IPFamily,
syncPeriod time.Duration, syncPeriod time.Duration,
minSyncPeriod time.Duration, minSyncPeriod time.Duration,
clusterCIDR string, clusterCIDR string,
hostname string, hostname string,
nodeIP net.IP, nodeIP net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater, healthzServer *healthcheck.ProxierHealthServer,
config config.KubeProxyWinkernelConfiguration, config config.KubeProxyWinkernelConfiguration,
healthzPort int, healthzPort int,
) (*Proxier, error) { ) (*Proxier, error) {
@ -690,12 +692,6 @@ func NewProxier(
klog.InfoS("ClusterCIDR not specified, unable to distinguish between internal and external traffic") klog.InfoS("ClusterCIDR not specified, unable to distinguish between internal and external traffic")
} }
isIPv6 := netutils.IsIPv6(nodeIP)
ipFamily := v1.IPv4Protocol
if isIPv6 {
ipFamily = v1.IPv6Protocol
}
// windows listens to all node addresses // windows listens to all node addresses
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil) nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
@ -778,6 +774,7 @@ func NewProxier(
} }
proxier := &Proxier{ proxier := &Proxier{
ipFamily: ipFamily,
endPointsRefCount: make(endPointsReferenceCountMap), endPointsRefCount: make(endPointsReferenceCountMap),
svcPortMap: make(proxy.ServicePortMap), svcPortMap: make(proxy.ServicePortMap),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
@ -794,7 +791,6 @@ func NewProxier(
hostMac: hostMac, hostMac: hostMac,
isDSR: isDSR, isDSR: isDSR,
supportedFeatures: supportedFeatures, supportedFeatures: supportedFeatures,
isIPv6Mode: isIPv6,
healthzPort: healthzPort, healthzPort: healthzPort,
rootHnsEndpointName: config.RootHnsEndpointName, rootHnsEndpointName: config.RootHnsEndpointName,
forwardHealthCheckVip: config.ForwardHealthCheckVip, forwardHealthCheckVip: config.ForwardHealthCheckVip,
@ -819,13 +815,13 @@ func NewDualStackProxier(
hostname string, hostname string,
nodeIPs map[v1.IPFamily]net.IP, nodeIPs map[v1.IPFamily]net.IP,
recorder events.EventRecorder, recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater, healthzServer *healthcheck.ProxierHealthServer,
config config.KubeProxyWinkernelConfiguration, config config.KubeProxyWinkernelConfiguration,
healthzPort int, healthzPort int,
) (proxy.Provider, error) { ) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier // Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(syncPeriod, minSyncPeriod, ipv4Proxier, err := NewProxier(v1.IPv4Protocol, syncPeriod, minSyncPeriod,
clusterCIDR, hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer, clusterCIDR, hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer,
config, healthzPort) config, healthzPort)
@ -833,7 +829,7 @@ func NewDualStackProxier(
return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIPs[v1.IPv4Protocol]) return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIPs[v1.IPv4Protocol])
} }
ipv6Proxier, err := NewProxier(syncPeriod, minSyncPeriod, ipv6Proxier, err := NewProxier(v1.IPv6Protocol, syncPeriod, minSyncPeriod,
clusterCIDR, hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer, clusterCIDR, hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer,
config, healthzPort) config, healthzPort)
if err != nil { if err != nil {
@ -937,7 +933,7 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy(mapStaleLoadbalancer map[st
// Sync is called to synchronize the proxier state to hns as soon as possible. // Sync is called to synchronize the proxier state to hns as soon as possible.
func (proxier *Proxier) Sync() { func (proxier *Proxier) Sync() {
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.QueuedUpdate() proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
} }
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
proxier.syncRunner.Run() proxier.syncRunner.Run()
@ -947,7 +943,7 @@ func (proxier *Proxier) Sync() {
func (proxier *Proxier) SyncLoop() { func (proxier *Proxier) SyncLoop() {
// Update healthz timestamp at beginning in case Sync() never succeeds. // Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated(proxier.ipFamily)
} }
// synthesize "last change queued" time as the informers are syncing. // synthesize "last change queued" time as the informers are syncing.
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
@ -1434,7 +1430,7 @@ func (proxier *Proxier) syncProxyRules() {
// Cluster IP LoadBalancer creation // Cluster IP LoadBalancer creation
hnsLoadBalancer, err := hns.getLoadBalancer( hnsLoadBalancer, err := hns.getLoadBalancer(
clusterIPEndpoints, clusterIPEndpoints,
loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.isIPv6Mode, sessionAffinity: sessionAffinityClientIP}, loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP},
sourceVip, sourceVip,
svcInfo.ClusterIP().String(), svcInfo.ClusterIP().String(),
Enum(svcInfo.Protocol()), Enum(svcInfo.Protocol()),
@ -1469,7 +1465,7 @@ func (proxier *Proxier) syncProxyRules() {
// If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer // If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer
hnsLoadBalancer, err := hns.getLoadBalancer( hnsLoadBalancer, err := hns.getLoadBalancer(
nodePortEndpoints, nodePortEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
sourceVip, sourceVip,
"", "",
Enum(svcInfo.Protocol()), Enum(svcInfo.Protocol()),
@ -1504,7 +1500,7 @@ func (proxier *Proxier) syncProxyRules() {
// Try loading existing policies, if already available // Try loading existing policies, if already available
hnsLoadBalancer, err = hns.getLoadBalancer( hnsLoadBalancer, err = hns.getLoadBalancer(
externalIPEndpoints, externalIPEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
sourceVip, sourceVip,
externalIP.ip, externalIP.ip,
Enum(svcInfo.Protocol()), Enum(svcInfo.Protocol()),
@ -1535,7 +1531,7 @@ func (proxier *Proxier) syncProxyRules() {
if len(lbIngressEndpoints) > 0 { if len(lbIngressEndpoints) > 0 {
hnsLoadBalancer, err := hns.getLoadBalancer( hnsLoadBalancer, err := hns.getLoadBalancer(
lbIngressEndpoints, lbIngressEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
sourceVip, sourceVip,
lbIngressIP.ip, lbIngressIP.ip,
Enum(svcInfo.Protocol()), Enum(svcInfo.Protocol()),
@ -1587,7 +1583,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated(proxier.ipFamily)
} }
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime() metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()