Merge pull request #112046 from aojea/etcd_healthz

rate limite etcd healthcheck request
This commit is contained in:
Kubernetes Prow Robot 2022-09-12 12:01:27 -07:00 committed by GitHub
commit 0f37b31206
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 311 additions and 13 deletions

View File

@ -35,6 +35,7 @@ require (
golang.org/x/net v0.0.0-20220722155237-a158d28d115b
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/grpc v1.47.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/square/go-jose.v2 v2.2.2
@ -109,7 +110,6 @@ require (
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect
google.golang.org/protobuf v1.28.1 // indirect

View File

@ -35,6 +35,7 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime"
@ -123,20 +124,42 @@ func newETCD3ReadyCheck(c storagebackend.Config, stopCh <-chan struct{}) (func()
return newETCD3Check(c, timeout, stopCh)
}
// atomic error acts as a cache for atomically store an error
// the error is only updated if the timestamp is more recent than
// current stored error.
type atomicLastError struct {
mu sync.RWMutex
err error
timestamp time.Time
}
func (a *atomicLastError) Store(err error, t time.Time) {
a.mu.Lock()
defer a.mu.Unlock()
if a.timestamp.IsZero() || a.timestamp.Before(t) {
a.err = err
a.timestamp = t
}
}
func (a *atomicLastError) Load() error {
a.mu.RLock()
defer a.mu.RUnlock()
return a.err
}
func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan struct{}) (func() error, error) {
// constructing the etcd v3 client blocks and times out if etcd is not available.
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
lock := sync.Mutex{}
lock := sync.RWMutex{}
var client *clientv3.Client
clientErr := fmt.Errorf("etcd client connection not yet established")
go wait.PollUntil(time.Second, func() (bool, error) {
newClient, err := newETCD3Client(c.Transport)
lock.Lock()
defer lock.Unlock()
// Ensure that server is already not shutting down.
select {
case <-stopCh:
@ -146,7 +169,6 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
return true, nil
default:
}
if err != nil {
clientErr = err
return false, nil
@ -169,25 +191,37 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
}
}()
// limit to a request every half of the configured timeout with a maximum burst of one
// rate limited requests will receive the last request sent error (note: not the last received response)
limiter := rate.NewLimiter(rate.Every(timeout/2), 1)
// initial state is the clientErr
lastError := &atomicLastError{err: fmt.Errorf("etcd client connection not yet established")}
return func() error {
// Given that client is closed on shutdown we hold the lock for
// the entire period of healthcheck call to ensure that client will
// not be closed during healthcheck.
// Given that healthchecks has a 2s timeout, worst case of blocking
// shutdown for additional 2s seems acceptable.
lock.Lock()
defer lock.Unlock()
lock.RLock()
defer lock.RUnlock()
if clientErr != nil {
return clientErr
}
if limiter.Allow() == false {
return lastError.Load()
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
now := time.Now()
_, err := client.Get(ctx, path.Join("/", c.Prefix, "health"))
if err == nil {
return nil
if err != nil {
err = fmt.Errorf("error getting data from etcd: %w", err)
}
return fmt.Errorf("error getting data from etcd: %w", err)
lastError.Store(err, now)
return err
}, nil
}

View File

@ -0,0 +1,48 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"errors"
"fmt"
"testing"
"time"
)
func Test_atomicLastError(t *testing.T) {
aError := &atomicLastError{err: fmt.Errorf("initial error")}
// no timestamp is always updated
aError.Store(errors.New("updated error"), time.Time{})
err := aError.Load()
if err.Error() != "updated error" {
t.Fatalf("Expected: \"updated error\" got: %s", err.Error())
}
// update to current time
now := time.Now()
aError.Store(errors.New("now error"), now)
err = aError.Load()
if err.Error() != "now error" {
t.Fatalf("Expected: \"now error\" got: %s", err.Error())
}
// no update to past time
past := now.Add(-5 * time.Second)
aError.Store(errors.New("past error"), past)
err = aError.Load()
if err.Error() != "now error" {
t.Fatalf("Expected: \"now error\" got: %s", err.Error())
}
}

View File

@ -19,6 +19,10 @@ package factory
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -105,8 +109,10 @@ func TestCreateHealthcheck(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ready := make(chan struct{})
tc.cfg.Transport.ServerList = client.Endpoints()
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
select {
@ -121,13 +127,14 @@ func TestCreateHealthcheck(t *testing.T) {
return client, nil
}
stop := make(chan struct{})
defer close(stop)
healthcheck, err := CreateHealthCheck(tc.cfg, stop)
if err != nil {
t.Fatal(err)
}
// Wait for healthcheck to establish connection
time.Sleep(2 * time.Second)
<-ready
got := healthcheck()
if !errors.Is(got, tc.want) {
@ -202,8 +209,10 @@ func TestCreateReadycheck(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ready := make(chan struct{})
tc.cfg.Transport.ServerList = client.Endpoints()
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
select {
@ -218,12 +227,14 @@ func TestCreateReadycheck(t *testing.T) {
return client, nil
}
stop := make(chan struct{})
defer close(stop)
healthcheck, err := CreateReadyCheck(tc.cfg, stop)
if err != nil {
t.Fatal(err)
}
// Wait for healthcheck to establish connection
time.Sleep(2 * time.Second)
<-ready
got := healthcheck()
@ -233,3 +244,208 @@ func TestCreateReadycheck(t *testing.T) {
})
}
}
func TestRateLimitHealthcheck(t *testing.T) {
etcdConfig := testserver.NewTestConfig(t)
client := testserver.RunEtcd(t, etcdConfig)
newETCD3ClientFn := newETCD3Client
defer func() {
newETCD3Client = newETCD3ClientFn
}()
cfg := storagebackend.Config{
Type: storagebackend.StorageTypeETCD3,
Transport: storagebackend.TransportConfig{},
HealthcheckTimeout: 5 * time.Second,
}
cfg.Transport.ServerList = client.Endpoints()
tests := []struct {
name string
want error
}{
{
name: "etcd ok",
},
{
name: "etcd down",
want: errors.New("etcd down"),
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ready := make(chan struct{})
var counter uint64
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
atomic.AddUint64(&counter, 1)
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, tc.want
}
},
}
client.KV = dummyKV
return client, nil
}
stop := make(chan struct{})
defer close(stop)
healthcheck, err := CreateHealthCheck(cfg, stop)
if err != nil {
t.Fatal(err)
}
// Wait for healthcheck to establish connection
<-ready
// run a first request to obtain the state
err = healthcheck()
if !errors.Is(err, tc.want) {
t.Errorf("healthcheck() mismatch want %v got %v", tc.want, err)
}
// run multiple request in parallel, they should have the same state that the first one
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := healthcheck()
if !errors.Is(err, tc.want) {
t.Errorf("healthcheck() mismatch want %v got %v", tc.want, err)
}
}()
}
// check the counter once the requests have finished
wg.Wait()
if counter != 1 {
t.Errorf("healthcheck() called etcd %d times, expected only one call", counter)
}
// wait until the rate limit allows new connections
time.Sleep(cfg.HealthcheckTimeout / 2)
// a new run on request should increment the counter only once
// run multiple request in parallel, they should have the same state that the first one
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := healthcheck()
if !errors.Is(err, tc.want) {
t.Errorf("healthcheck() mismatch want %v got %v", tc.want, err)
}
}()
}
wg.Wait()
if counter != 2 {
t.Errorf("healthcheck() called etcd %d times, expected only two calls", counter)
}
})
}
}
func TestTimeTravelHealthcheck(t *testing.T) {
etcdConfig := testserver.NewTestConfig(t)
client := testserver.RunEtcd(t, etcdConfig)
newETCD3ClientFn := newETCD3Client
defer func() {
newETCD3Client = newETCD3ClientFn
}()
cfg := storagebackend.Config{
Type: storagebackend.StorageTypeETCD3,
Transport: storagebackend.TransportConfig{},
HealthcheckTimeout: 5 * time.Second,
}
cfg.Transport.ServerList = client.Endpoints()
ready := make(chan struct{})
signal := make(chan struct{})
var counter uint64
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
atomic.AddUint64(&counter, 1)
val := atomic.LoadUint64(&counter)
// the first request wait for a custom timeout to trigger an error.
// We don't use the context timeout because we want to check that
// the cached answer is not overridden, and since the rate limit is
// based on cfg.HealthcheckTimeout / 2, the timeout will race with
// the race limiter to server the new request from the cache or allow
// it to go through
if val == 1 {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After((2 * cfg.HealthcheckTimeout) / 3):
return nil, fmt.Errorf("etcd down")
}
}
// subsequent requests will always work
return nil, nil
},
}
client.KV = dummyKV
return client, nil
}
stop := make(chan struct{})
defer close(stop)
healthcheck, err := CreateHealthCheck(cfg, stop)
if err != nil {
t.Fatal(err)
}
// Wait for healthcheck to establish connection
<-ready
// run a first request that fails after 2 seconds
go func() {
err := healthcheck()
if !strings.Contains(err.Error(), "etcd down") {
t.Errorf("healthcheck() mismatch want %v got %v", fmt.Errorf("etcd down"), err)
}
close(signal)
}()
// wait until the rate limit allows new connections
time.Sleep(cfg.HealthcheckTimeout / 2)
select {
case <-signal:
t.Errorf("first request should not return yet")
default:
}
// a new run on request should succeed and increment the counter
err = healthcheck()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
c := atomic.LoadUint64(&counter)
if c != 2 {
t.Errorf("healthcheck() called etcd %d times, expected only two calls", c)
}
// cached request should be success and not be overridden by the late error
<-signal
err = healthcheck()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
c = atomic.LoadUint64(&counter)
if c != 2 {
t.Errorf("healthcheck() called etcd %d times, expected only two calls", c)
}
}