rate limit /healthz etcd healthchecks

return the last request error, instead of last error received
The rate limit allows 1 event per healthcheck timeout / 2
This commit is contained in:
Antonio Ojea 2022-08-29 11:09:58 +02:00
parent dd6d3d95cd
commit 510a85c53a
4 changed files with 298 additions and 7 deletions

View File

@ -35,6 +35,7 @@ require (
golang.org/x/net v0.0.0-20220722155237-a158d28d115b
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/grpc v1.47.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/square/go-jose.v2 v2.2.2
@ -108,7 +109,6 @@ require (
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect
google.golang.org/protobuf v1.28.1 // indirect

View File

@ -35,6 +35,7 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime"
@ -123,6 +124,30 @@ func newETCD3ReadyCheck(c storagebackend.Config, stopCh <-chan struct{}) (func()
return newETCD3Check(c, timeout, stopCh)
}
// atomic error acts as a cache for atomically store an error
// the error is only updated if the timestamp is more recent than
// current stored error.
type atomicLastError struct {
mu sync.RWMutex
err error
timestamp time.Time
}
func (a *atomicLastError) Store(err error, t time.Time) {
a.mu.Lock()
defer a.mu.Unlock()
if a.timestamp.IsZero() || a.timestamp.Before(t) {
a.err = err
a.timestamp = t
}
}
func (a *atomicLastError) Load() error {
a.mu.RLock()
defer a.mu.RUnlock()
return a.err
}
func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan struct{}) (func() error, error) {
// constructing the etcd v3 client blocks and times out if etcd is not available.
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
@ -133,10 +158,8 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
go wait.PollUntil(time.Second, func() (bool, error) {
newClient, err := newETCD3Client(c.Transport)
lock.Lock()
defer lock.Unlock()
// Ensure that server is already not shutting down.
select {
case <-stopCh:
@ -146,7 +169,6 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
return true, nil
default:
}
if err != nil {
clientErr = err
return false, nil
@ -169,6 +191,12 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
}
}()
// limit to a request every half of the configured timeout with a maximum burst of one
// rate limited requests will receive the last request sent error (note: not the last received response)
limiter := rate.NewLimiter(rate.Every(timeout/2), 1)
// initial state is the clientErr
lastError := &atomicLastError{err: fmt.Errorf("etcd client connection not yet established")}
return func() error {
// Given that client is closed on shutdown we hold the lock for
// the entire period of healthcheck call to ensure that client will
@ -177,17 +205,23 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan
// shutdown for additional 2s seems acceptable.
lock.RLock()
defer lock.RUnlock()
if clientErr != nil {
return clientErr
}
if limiter.Allow() == false {
return lastError.Load()
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
now := time.Now()
_, err := client.Get(ctx, path.Join("/", c.Prefix, "health"))
if err == nil {
return nil
if err != nil {
err = fmt.Errorf("error getting data from etcd: %w", err)
}
return fmt.Errorf("error getting data from etcd: %w", err)
lastError.Store(err, now)
return err
}, nil
}

View File

@ -0,0 +1,48 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"errors"
"fmt"
"testing"
"time"
)
func Test_atomicLastError(t *testing.T) {
aError := &atomicLastError{err: fmt.Errorf("initial error")}
// no timestamp is always updated
aError.Store(errors.New("updated error"), time.Time{})
err := aError.Load()
if err.Error() != "updated error" {
t.Fatalf("Expected: \"updated error\" got: %s", err.Error())
}
// update to current time
now := time.Now()
aError.Store(errors.New("now error"), now)
err = aError.Load()
if err.Error() != "now error" {
t.Fatalf("Expected: \"now error\" got: %s", err.Error())
}
// no update to past time
past := now.Add(-5 * time.Second)
aError.Store(errors.New("past error"), past)
err = aError.Load()
if err.Error() != "now error" {
t.Fatalf("Expected: \"now error\" got: %s", err.Error())
}
}

View File

@ -19,6 +19,10 @@ package factory
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -240,3 +244,208 @@ func TestCreateReadycheck(t *testing.T) {
})
}
}
func TestRateLimitHealthcheck(t *testing.T) {
etcdConfig := testserver.NewTestConfig(t)
client := testserver.RunEtcd(t, etcdConfig)
newETCD3ClientFn := newETCD3Client
defer func() {
newETCD3Client = newETCD3ClientFn
}()
cfg := storagebackend.Config{
Type: storagebackend.StorageTypeETCD3,
Transport: storagebackend.TransportConfig{},
HealthcheckTimeout: 5 * time.Second,
}
cfg.Transport.ServerList = client.Endpoints()
tests := []struct {
name string
want error
}{
{
name: "etcd ok",
},
{
name: "etcd down",
want: errors.New("etcd down"),
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ready := make(chan struct{})
var counter uint64
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
atomic.AddUint64(&counter, 1)
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, tc.want
}
},
}
client.KV = dummyKV
return client, nil
}
stop := make(chan struct{})
defer close(stop)
healthcheck, err := CreateHealthCheck(cfg, stop)
if err != nil {
t.Fatal(err)
}
// Wait for healthcheck to establish connection
<-ready
// run a first request to obtain the state
err = healthcheck()
if !errors.Is(err, tc.want) {
t.Errorf("healthcheck() mismatch want %v got %v", tc.want, err)
}
// run multiple request in parallel, they should have the same state that the first one
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := healthcheck()
if !errors.Is(err, tc.want) {
t.Errorf("healthcheck() mismatch want %v got %v", tc.want, err)
}
}()
}
// check the counter once the requests have finished
wg.Wait()
if counter != 1 {
t.Errorf("healthcheck() called etcd %d times, expected only one call", counter)
}
// wait until the rate limit allows new connections
time.Sleep(cfg.HealthcheckTimeout / 2)
// a new run on request should increment the counter only once
// run multiple request in parallel, they should have the same state that the first one
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := healthcheck()
if !errors.Is(err, tc.want) {
t.Errorf("healthcheck() mismatch want %v got %v", tc.want, err)
}
}()
}
wg.Wait()
if counter != 2 {
t.Errorf("healthcheck() called etcd %d times, expected only two calls", counter)
}
})
}
}
func TestTimeTravelHealthcheck(t *testing.T) {
etcdConfig := testserver.NewTestConfig(t)
client := testserver.RunEtcd(t, etcdConfig)
newETCD3ClientFn := newETCD3Client
defer func() {
newETCD3Client = newETCD3ClientFn
}()
cfg := storagebackend.Config{
Type: storagebackend.StorageTypeETCD3,
Transport: storagebackend.TransportConfig{},
HealthcheckTimeout: 5 * time.Second,
}
cfg.Transport.ServerList = client.Endpoints()
ready := make(chan struct{})
signal := make(chan struct{})
var counter uint64
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
atomic.AddUint64(&counter, 1)
val := atomic.LoadUint64(&counter)
// the first request wait for a custom timeout to trigger an error.
// We don't use the context timeout because we want to check that
// the cached answer is not overridden, and since the rate limit is
// based on cfg.HealthcheckTimeout / 2, the timeout will race with
// the race limiter to server the new request from the cache or allow
// it to go through
if val == 1 {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After((2 * cfg.HealthcheckTimeout) / 3):
return nil, fmt.Errorf("etcd down")
}
}
// subsequent requests will always work
return nil, nil
},
}
client.KV = dummyKV
return client, nil
}
stop := make(chan struct{})
defer close(stop)
healthcheck, err := CreateHealthCheck(cfg, stop)
if err != nil {
t.Fatal(err)
}
// Wait for healthcheck to establish connection
<-ready
// run a first request that fails after 2 seconds
go func() {
err := healthcheck()
if !strings.Contains(err.Error(), "etcd down") {
t.Errorf("healthcheck() mismatch want %v got %v", fmt.Errorf("etcd down"), err)
}
close(signal)
}()
// wait until the rate limit allows new connections
time.Sleep(cfg.HealthcheckTimeout / 2)
select {
case <-signal:
t.Errorf("first request should not return yet")
default:
}
// a new run on request should succeed and increment the counter
err = healthcheck()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
c := atomic.LoadUint64(&counter)
if c != 2 {
t.Errorf("healthcheck() called etcd %d times, expected only two calls", c)
}
// cached request should be success and not be overridden by the late error
<-signal
err = healthcheck()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
c = atomic.LoadUint64(&counter)
if c != 2 {
t.Errorf("healthcheck() called etcd %d times, expected only two calls", c)
}
}