Update recorders to wrap kubernetes.Client

This commit is contained in:
Marek Siarkowicz 2024-07-05 14:19:49 +02:00
parent 249ad2a613
commit 066c1c05d7
9 changed files with 52 additions and 47 deletions

View File

@ -180,42 +180,42 @@ func TestList(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true)
}
func TestListWithConsistentListFromCache(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true)
}
func TestConsistentList(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, false)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true, false)
}
func TestConsistentListWithConsistentListFromCache(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, true)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true, true)
}
func TestGetListNonRecursive(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client), cacher)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client.Client), cacher)
}
func TestGetListNonRecursiveWithConsistentListFromCache(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client), cacher)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client.Client), cacher)
}
func TestGetListRecursivePrefix(t *testing.T) {
@ -301,7 +301,7 @@ func TestWatch(t *testing.T) {
func TestWatchFromZero(t *testing.T) {
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client))
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client))
}
func TestDeleteTriggerWatch(t *testing.T) {

View File

@ -27,7 +27,7 @@ import (
)
func TestCompact(t *testing.T) {
client := testserver.RunEtcd(t, nil)
client := testserver.RunEtcd(t, nil).Client
ctx := context.Background()
putResp, err := client.Put(ctx, "/somekey", "data")
@ -56,7 +56,7 @@ func TestCompact(t *testing.T) {
// - C1 compacts first. It will succeed.
// - C2 compacts after. It will fail. But it will get latest logical time, which should be larger by one.
func TestCompactConflict(t *testing.T) {
client := testserver.RunEtcd(t, nil)
client := testserver.RunEtcd(t, nil).Client
ctx := context.Background()
putResp, err := client.Put(ctx, "/somekey", "data")

View File

@ -27,7 +27,7 @@ import (
"time"
clientv3 "go.etcd.io/etcd/client/v3"
_ "go.etcd.io/etcd/client/v3/kubernetes"
"go.etcd.io/etcd/client/v3/kubernetes"
"go.opentelemetry.io/otel/attribute"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -73,7 +73,7 @@ func (d authenticatedDataString) AuthenticatedData() []byte {
var _ value.Context = authenticatedDataString("")
type store struct {
client *clientv3.Client
client *kubernetes.Client
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
@ -100,11 +100,11 @@ type objState struct {
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface {
func New(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface {
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner)
}
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store {
func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store {
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
@ -115,7 +115,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
}
w := &watcher{
client: c,
client: c.Client,
codec: codec,
newFunc: newFunc,
groupResource: groupResource,
@ -136,7 +136,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
groupResource: groupResource,
groupResourceString: groupResource.String(),
watcher: w,
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
leaseManager: newDefaultLeaseManager(c.Client, leaseManagerConfig),
decoder: decoder,
}

View File

@ -29,6 +29,7 @@ import (
"github.com/go-logr/logr"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"go.etcd.io/etcd/server/v3/embed"
"google.golang.org/grpc/grpclog"
@ -95,7 +96,7 @@ func checkStorageInvariants(etcdClient *clientv3.Client, codec runtime.Codec) st
func TestCreate(t *testing.T) {
ctx, store, etcdClient := testSetup(t)
storagetesting.RunTestCreate(ctx, t, store, checkStorageInvariants(etcdClient, store.codec))
storagetesting.RunTestCreate(ctx, t, store, checkStorageInvariants(etcdClient.Client, store.codec))
}
func TestCreateWithTTL(t *testing.T) {
@ -170,7 +171,7 @@ func TestListPaging(t *testing.T) {
func TestGetListNonRecursive(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client), store)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client.Client), store)
}
func TestGetListRecursivePrefix(t *testing.T) {
@ -194,8 +195,8 @@ func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetes
}
func TestGuaranteedUpdate(t *testing.T) {
ctx, store, etcdClient := testSetup(t)
storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(etcdClient, store.codec))
ctx, store, client := testSetup(t)
storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(client.Client, store.codec))
}
func TestGuaranteedUpdateWithTTL(t *testing.T) {
@ -225,12 +226,12 @@ func TestTransformationFailure(t *testing.T) {
func TestList(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestList(ctx, t, store, compactStorage(client), false)
storagetesting.RunTestList(ctx, t, store, compactStorage(client.Client), false)
}
func TestConsistentList(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client), false, true)
storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client.Client), false, true)
}
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {
@ -258,29 +259,29 @@ func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer,
}
}
if reads := recorder.GetReadsAndReset(); reads != estimatedGetCalls {
t.Errorf("unexpected reads: %d", reads)
t.Fatalf("unexpected reads: %d, want: %d", reads, estimatedGetCalls)
}
}
}
func TestListContinuation(t *testing.T) {
ctx, store, etcdClient := testSetup(t, withRecorder())
ctx, store, client := testSetup(t, withRecorder())
validation := checkStorageCallsInvariants(
store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder))
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder))
storagetesting.RunTestListContinuation(ctx, t, store, validation)
}
func TestListPaginationRareObject(t *testing.T) {
ctx, store, etcdClient := testSetup(t, withRecorder())
ctx, store, client := testSetup(t, withRecorder())
validation := checkStorageCallsInvariants(
store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder))
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder))
storagetesting.RunTestListPaginationRareObject(ctx, t, store, validation)
}
func TestListContinuationWithFilter(t *testing.T) {
ctx, store, etcdClient := testSetup(t, withRecorder())
ctx, store, client := testSetup(t, withRecorder())
validation := checkStorageCallsInvariants(
store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder))
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder))
storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation)
}
@ -299,7 +300,7 @@ func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction {
func TestListInconsistentContinuation(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client))
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client.Client))
}
func TestListResourceVersionMatch(t *testing.T) {
@ -499,7 +500,7 @@ func (r *clientRecorder) GetReadsAndReset() uint64 {
}
type setupOptions struct {
client func(testing.TB) *clientv3.Client
client func(testing.TB) *kubernetes.Client
codec runtime.Codec
newFunc func() runtime.Object
newListFunc func() runtime.Object
@ -516,7 +517,7 @@ type setupOption func(*setupOptions)
func withClientConfig(config *embed.Config) setupOption {
return func(options *setupOptions) {
options.client = func(t testing.TB) *clientv3.Client {
options.client = func(t testing.TB) *kubernetes.Client {
return testserver.RunEtcd(t, config)
}
}
@ -541,7 +542,7 @@ func withRecorder() setupOption {
}
func withDefaults(options *setupOptions) {
options.client = func(t testing.TB) *clientv3.Client {
options.client = func(t testing.TB) *kubernetes.Client {
return testserver.RunEtcd(t, nil)
}
options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
@ -556,7 +557,7 @@ func withDefaults(options *setupOptions) {
var _ setupOption = withDefaults
func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *clientv3.Client) {
func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *kubernetes.Client) {
setupOpts := setupOptions{}
opts = append([]setupOption{withDefaults}, opts...)
for _, opt := range opts {

View File

@ -19,7 +19,7 @@ package testing
import (
"testing"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
"k8s.io/apiserver/pkg/storage/storagebackend"
@ -27,7 +27,7 @@ import (
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
type EtcdTestServer struct {
V3Client *clientv3.Client
V3Client *kubernetes.Client
}
func (e *EtcdTestServer) Terminate(t testing.TB) {

View File

@ -26,6 +26,7 @@ import (
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
@ -81,7 +82,7 @@ func NewTestConfig(t testing.TB) *embed.Config {
// RunEtcd starts an embedded etcd server with the provided config
// (or NewTestConfig(t) if nil), and returns a client connected to the server.
// The server is terminated when the test ends.
func RunEtcd(t testing.TB, cfg *embed.Config) *clientv3.Client {
func RunEtcd(t testing.TB, cfg *embed.Config) *kubernetes.Client {
t.Helper()
if cfg == nil {
@ -112,7 +113,7 @@ func RunEtcd(t testing.TB, cfg *embed.Config) *clientv3.Client {
t.Fatal(err)
}
client, err := clientv3.New(clientv3.Config{
client, err := kubernetes.New(clientv3.Config{
TLS: tlsConfig,
Endpoints: e.Server.Cluster().ClientURLs(),
DialTimeout: 10 * time.Second,

View File

@ -64,7 +64,7 @@ func TestDeleteTriggerWatch(t *testing.T) {
func TestWatchFromZero(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client))
storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client.Client))
}
// TestWatchFromNonZero tests that

View File

@ -33,6 +33,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -228,7 +229,7 @@ func newETCD3ProberMonitor(c storagebackend.Config) (*etcd3ProberMonitor, error)
return nil, err
}
return &etcd3ProberMonitor{
client: client,
client: client.Client,
prefix: c.Prefix,
endpoints: c.Transport.ServerList,
}, nil
@ -282,7 +283,7 @@ func (t *etcd3ProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetric
}, nil
}
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
var newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) {
tlsInfo := transport.TLSInfo{
CertFile: c.CertFile,
KeyFile: c.KeyFile,
@ -352,7 +353,7 @@ var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, e
Logger: etcd3ClientLogger,
}
return clientv3.New(cfg)
return kubernetes.New(cfg)
}
type runningCompactor struct {
@ -384,10 +385,11 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
}
key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile}
if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval {
compactorClient, err := newETCD3Client(c)
client, err := newETCD3Client(c)
if err != nil {
return nil, err
}
compactorClient := client.Client
if foundBefore {
// replace compactor
@ -439,7 +441,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
// decorate the KV instance so we can track etcd latency per request.
client.KV = etcd3.NewETCDLatencyTracker(client.KV)
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client.Client, c.DBMetricPollInterval)
if err != nil {
return nil, nil, err
}

View File

@ -27,6 +27,7 @@ import (
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
@ -111,7 +112,7 @@ func TestCreateHealthcheck(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ready := make(chan struct{})
tc.cfg.Transport.ServerList = client.Endpoints()
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
@ -211,7 +212,7 @@ func TestCreateReadycheck(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ready := make(chan struct{})
tc.cfg.Transport.ServerList = client.Endpoints()
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
@ -277,7 +278,7 @@ func TestRateLimitHealthcheck(t *testing.T) {
ready := make(chan struct{})
var counter uint64
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
@ -373,7 +374,7 @@ func TestTimeTravelHealthcheck(t *testing.T) {
signal := make(chan struct{})
var counter uint64
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {