From 5a67248115ec0bc762c0351a73810117ac0bb814 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Aug 2022 17:57:01 +0200 Subject: [PATCH 1/3] don't serialize etcd healthchecks --- .../apiserver/pkg/storage/storagebackend/factory/etcd3.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 55e2cc0b42c..ef684678a08 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -127,7 +127,7 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan // constructing the etcd v3 client blocks and times out if etcd is not available. // retry in a loop in the background until we successfully create the client, storing the client or error encountered - lock := sync.Mutex{} + lock := sync.RWMutex{} var client *clientv3.Client clientErr := fmt.Errorf("etcd client connection not yet established") @@ -175,8 +175,8 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan // not be closed during healthcheck. // Given that healthchecks has a 2s timeout, worst case of blocking // shutdown for additional 2s seems acceptable. - lock.Lock() - defer lock.Unlock() + lock.RLock() + defer lock.RUnlock() if clientErr != nil { return clientErr } From dd6d3d95cdeb0e165e8365212d85d0f3b972d3e8 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sun, 28 Aug 2022 23:13:45 +0200 Subject: [PATCH 2/3] fix etcd unit tests stop leaking goroutines reduce etcd test duration --- .../storage/storagebackend/factory/factory_test.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory_test.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory_test.go index 963d24c9112..36fe8116fd4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory_test.go @@ -105,8 +105,10 @@ func TestCreateHealthcheck(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + ready := make(chan struct{}) tc.cfg.Transport.ServerList = client.Endpoints() newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { + defer close(ready) dummyKV := mockKV{ get: func(ctx context.Context) (*clientv3.GetResponse, error) { select { @@ -121,13 +123,14 @@ func TestCreateHealthcheck(t *testing.T) { return client, nil } stop := make(chan struct{}) + defer close(stop) + healthcheck, err := CreateHealthCheck(tc.cfg, stop) if err != nil { t.Fatal(err) } // Wait for healthcheck to establish connection - time.Sleep(2 * time.Second) - + <-ready got := healthcheck() if !errors.Is(got, tc.want) { @@ -202,8 +205,10 @@ func TestCreateReadycheck(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + ready := make(chan struct{}) tc.cfg.Transport.ServerList = client.Endpoints() newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { + defer close(ready) dummyKV := mockKV{ get: func(ctx context.Context) (*clientv3.GetResponse, error) { select { @@ -218,12 +223,14 @@ func TestCreateReadycheck(t *testing.T) { return client, nil } stop := make(chan struct{}) + defer close(stop) + healthcheck, err := CreateReadyCheck(tc.cfg, stop) if err != nil { t.Fatal(err) } // Wait for healthcheck to establish connection - time.Sleep(2 * time.Second) + <-ready got := healthcheck() From 510a85c53a5138babb1650fadd328e6f34baa03b Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Aug 2022 11:09:58 +0200 Subject: [PATCH 3/3] rate limit /healthz etcd healthchecks return the last request error, instead of last error received The rate limit allows 1 event per healthcheck timeout / 2 --- staging/src/k8s.io/apiserver/go.mod | 2 +- .../storage/storagebackend/factory/etcd3.go | 46 +++- .../storagebackend/factory/etcd3_test.go | 48 ++++ .../storagebackend/factory/factory_test.go | 209 ++++++++++++++++++ 4 files changed, 298 insertions(+), 7 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3_test.go diff --git a/staging/src/k8s.io/apiserver/go.mod b/staging/src/k8s.io/apiserver/go.mod index eb78bd80075..95bbecf4500 100644 --- a/staging/src/k8s.io/apiserver/go.mod +++ b/staging/src/k8s.io/apiserver/go.mod @@ -35,6 +35,7 @@ require ( golang.org/x/net v0.0.0-20220722155237-a158d28d115b golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f + golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 google.golang.org/grpc v1.47.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/square/go-jose.v2 v2.2.2 @@ -108,7 +109,6 @@ require ( golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index ef684678a08..e23c360b8eb 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -35,6 +35,7 @@ import ( "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/time/rate" "google.golang.org/grpc" "k8s.io/apimachinery/pkg/runtime" @@ -123,6 +124,30 @@ func newETCD3ReadyCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() return newETCD3Check(c, timeout, stopCh) } +// atomic error acts as a cache for atomically store an error +// the error is only updated if the timestamp is more recent than +// current stored error. +type atomicLastError struct { + mu sync.RWMutex + err error + timestamp time.Time +} + +func (a *atomicLastError) Store(err error, t time.Time) { + a.mu.Lock() + defer a.mu.Unlock() + if a.timestamp.IsZero() || a.timestamp.Before(t) { + a.err = err + a.timestamp = t + } +} + +func (a *atomicLastError) Load() error { + a.mu.RLock() + defer a.mu.RUnlock() + return a.err +} + func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan struct{}) (func() error, error) { // constructing the etcd v3 client blocks and times out if etcd is not available. // retry in a loop in the background until we successfully create the client, storing the client or error encountered @@ -133,10 +158,8 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan go wait.PollUntil(time.Second, func() (bool, error) { newClient, err := newETCD3Client(c.Transport) - lock.Lock() defer lock.Unlock() - // Ensure that server is already not shutting down. select { case <-stopCh: @@ -146,7 +169,6 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan return true, nil default: } - if err != nil { clientErr = err return false, nil @@ -169,6 +191,12 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan } }() + // limit to a request every half of the configured timeout with a maximum burst of one + // rate limited requests will receive the last request sent error (note: not the last received response) + limiter := rate.NewLimiter(rate.Every(timeout/2), 1) + // initial state is the clientErr + lastError := &atomicLastError{err: fmt.Errorf("etcd client connection not yet established")} + return func() error { // Given that client is closed on shutdown we hold the lock for // the entire period of healthcheck call to ensure that client will @@ -177,17 +205,23 @@ func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan // shutdown for additional 2s seems acceptable. lock.RLock() defer lock.RUnlock() + if clientErr != nil { return clientErr } + if limiter.Allow() == false { + return lastError.Load() + } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() // See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118 + now := time.Now() _, err := client.Get(ctx, path.Join("/", c.Prefix, "health")) - if err == nil { - return nil + if err != nil { + err = fmt.Errorf("error getting data from etcd: %w", err) } - return fmt.Errorf("error getting data from etcd: %w", err) + lastError.Store(err, now) + return err }, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3_test.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3_test.go new file mode 100644 index 00000000000..4ed8b1c232b --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3_test.go @@ -0,0 +1,48 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import ( + "errors" + "fmt" + "testing" + "time" +) + +func Test_atomicLastError(t *testing.T) { + aError := &atomicLastError{err: fmt.Errorf("initial error")} + // no timestamp is always updated + aError.Store(errors.New("updated error"), time.Time{}) + err := aError.Load() + if err.Error() != "updated error" { + t.Fatalf("Expected: \"updated error\" got: %s", err.Error()) + } + // update to current time + now := time.Now() + aError.Store(errors.New("now error"), now) + err = aError.Load() + if err.Error() != "now error" { + t.Fatalf("Expected: \"now error\" got: %s", err.Error()) + } + // no update to past time + past := now.Add(-5 * time.Second) + aError.Store(errors.New("past error"), past) + err = aError.Load() + if err.Error() != "now error" { + t.Fatalf("Expected: \"now error\" got: %s", err.Error()) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory_test.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory_test.go index 36fe8116fd4..7b9106dd9e6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory_test.go @@ -19,6 +19,10 @@ package factory import ( "context" "errors" + "fmt" + "strings" + "sync" + "sync/atomic" "testing" "time" @@ -240,3 +244,208 @@ func TestCreateReadycheck(t *testing.T) { }) } } + +func TestRateLimitHealthcheck(t *testing.T) { + etcdConfig := testserver.NewTestConfig(t) + client := testserver.RunEtcd(t, etcdConfig) + newETCD3ClientFn := newETCD3Client + defer func() { + newETCD3Client = newETCD3ClientFn + }() + + cfg := storagebackend.Config{ + Type: storagebackend.StorageTypeETCD3, + Transport: storagebackend.TransportConfig{}, + HealthcheckTimeout: 5 * time.Second, + } + cfg.Transport.ServerList = client.Endpoints() + tests := []struct { + name string + want error + }{ + { + name: "etcd ok", + }, + { + name: "etcd down", + want: errors.New("etcd down"), + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + + ready := make(chan struct{}) + + var counter uint64 + newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { + defer close(ready) + dummyKV := mockKV{ + get: func(ctx context.Context) (*clientv3.GetResponse, error) { + atomic.AddUint64(&counter, 1) + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + return nil, tc.want + } + }, + } + client.KV = dummyKV + return client, nil + } + + stop := make(chan struct{}) + defer close(stop) + healthcheck, err := CreateHealthCheck(cfg, stop) + if err != nil { + t.Fatal(err) + } + // Wait for healthcheck to establish connection + <-ready + // run a first request to obtain the state + err = healthcheck() + if !errors.Is(err, tc.want) { + t.Errorf("healthcheck() mismatch want %v got %v", tc.want, err) + } + + // run multiple request in parallel, they should have the same state that the first one + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := healthcheck() + if !errors.Is(err, tc.want) { + t.Errorf("healthcheck() mismatch want %v got %v", tc.want, err) + } + + }() + } + + // check the counter once the requests have finished + wg.Wait() + if counter != 1 { + t.Errorf("healthcheck() called etcd %d times, expected only one call", counter) + } + + // wait until the rate limit allows new connections + time.Sleep(cfg.HealthcheckTimeout / 2) + + // a new run on request should increment the counter only once + // run multiple request in parallel, they should have the same state that the first one + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := healthcheck() + if !errors.Is(err, tc.want) { + t.Errorf("healthcheck() mismatch want %v got %v", tc.want, err) + } + + }() + } + wg.Wait() + + if counter != 2 { + t.Errorf("healthcheck() called etcd %d times, expected only two calls", counter) + } + }) + } + +} + +func TestTimeTravelHealthcheck(t *testing.T) { + etcdConfig := testserver.NewTestConfig(t) + client := testserver.RunEtcd(t, etcdConfig) + newETCD3ClientFn := newETCD3Client + defer func() { + newETCD3Client = newETCD3ClientFn + }() + + cfg := storagebackend.Config{ + Type: storagebackend.StorageTypeETCD3, + Transport: storagebackend.TransportConfig{}, + HealthcheckTimeout: 5 * time.Second, + } + cfg.Transport.ServerList = client.Endpoints() + + ready := make(chan struct{}) + signal := make(chan struct{}) + + var counter uint64 + newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { + defer close(ready) + dummyKV := mockKV{ + get: func(ctx context.Context) (*clientv3.GetResponse, error) { + atomic.AddUint64(&counter, 1) + val := atomic.LoadUint64(&counter) + // the first request wait for a custom timeout to trigger an error. + // We don't use the context timeout because we want to check that + // the cached answer is not overridden, and since the rate limit is + // based on cfg.HealthcheckTimeout / 2, the timeout will race with + // the race limiter to server the new request from the cache or allow + // it to go through + if val == 1 { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After((2 * cfg.HealthcheckTimeout) / 3): + return nil, fmt.Errorf("etcd down") + } + } + // subsequent requests will always work + return nil, nil + }, + } + client.KV = dummyKV + return client, nil + } + + stop := make(chan struct{}) + defer close(stop) + healthcheck, err := CreateHealthCheck(cfg, stop) + if err != nil { + t.Fatal(err) + } + // Wait for healthcheck to establish connection + <-ready + // run a first request that fails after 2 seconds + go func() { + err := healthcheck() + if !strings.Contains(err.Error(), "etcd down") { + t.Errorf("healthcheck() mismatch want %v got %v", fmt.Errorf("etcd down"), err) + } + close(signal) + }() + + // wait until the rate limit allows new connections + time.Sleep(cfg.HealthcheckTimeout / 2) + + select { + case <-signal: + t.Errorf("first request should not return yet") + default: + } + + // a new run on request should succeed and increment the counter + err = healthcheck() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + c := atomic.LoadUint64(&counter) + if c != 2 { + t.Errorf("healthcheck() called etcd %d times, expected only two calls", c) + } + + // cached request should be success and not be overridden by the late error + <-signal + err = healthcheck() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + c = atomic.LoadUint64(&counter) + if c != 2 { + t.Errorf("healthcheck() called etcd %d times, expected only two calls", c) + } + +}