mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
In contrast to EtcdMain, it can be called by individual tests or benchmarks and each caller will get a fresh etcd instance. However, it uses the same underlying code and the same port for all instances, so tests cannot run in parallel.
233 lines
7.4 KiB
Go
233 lines
7.4 KiB
Go
/*
|
|
Copyright 2020 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package apiserver
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
|
"k8s.io/kubernetes/pkg/controlplane"
|
|
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
|
"k8s.io/kubernetes/test/integration/framework"
|
|
)
|
|
|
|
// setup create kube-apiserver backed up by two separate etcds,
|
|
// with one of them containing events and the other all other objects.
|
|
func multiEtcdSetup(t *testing.T) (clientset.Interface, framework.TearDownFunc) {
|
|
etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"}
|
|
etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs, nil)
|
|
if err != nil {
|
|
t.Fatalf("Couldn't start etcd: %v", err)
|
|
}
|
|
|
|
etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs, nil)
|
|
if err != nil {
|
|
t.Fatalf("Couldn't start etcd: %v", err)
|
|
}
|
|
|
|
etcdOptions := framework.DefaultEtcdOptions()
|
|
// Overwrite etcd setup to our custom etcd instances.
|
|
etcdOptions.StorageConfig.Transport.ServerList = []string{etcd0URL}
|
|
etcdOptions.EtcdServersOverrides = []string{fmt.Sprintf("/events#%s", etcd1URL)}
|
|
etcdOptions.EnableWatchCache = true
|
|
|
|
clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
|
|
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
|
|
// Ensure we're using the same etcd across apiserver restarts.
|
|
opts.Etcd = etcdOptions
|
|
},
|
|
ModifyServerConfig: func(config *controlplane.Config) {
|
|
// Switch off endpoints reconciler to avoid unnecessary operations.
|
|
config.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType
|
|
},
|
|
})
|
|
|
|
closeFn := func() {
|
|
tearDownFn()
|
|
stopEtcd1()
|
|
stopEtcd0()
|
|
}
|
|
|
|
// Wait for apiserver to be stabilized.
|
|
// Everything but default service creation is checked in StartTestServer above by
|
|
// waiting for post start hooks, so we just wait for default service to exist.
|
|
// TODO(wojtek-t): Figure out less fragile way.
|
|
ctx := context.Background()
|
|
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
|
_, err := clientSet.CoreV1().Services("default").Get(ctx, "kubernetes", metav1.GetOptions{})
|
|
return err == nil, nil
|
|
}); err != nil {
|
|
t.Fatalf("Failed to wait for kubernetes service: %v:", err)
|
|
}
|
|
return clientSet, closeFn
|
|
}
|
|
|
|
func TestWatchCacheUpdatedByEtcd(t *testing.T) {
|
|
c, closeFn := multiEtcdSetup(t)
|
|
defer closeFn()
|
|
|
|
ctx := context.Background()
|
|
|
|
makeConfigMap := func(name string) *v1.ConfigMap {
|
|
return &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: name}}
|
|
}
|
|
makeSecret := func(name string) *v1.Secret {
|
|
return &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: name}}
|
|
}
|
|
makeEvent := func(name string) *v1.Event {
|
|
return &v1.Event{ObjectMeta: metav1.ObjectMeta{Name: name}}
|
|
}
|
|
|
|
cm, err := c.CoreV1().ConfigMaps("default").Create(ctx, makeConfigMap("name"), metav1.CreateOptions{})
|
|
if err != nil {
|
|
t.Errorf("Couldn't create configmap: %v", err)
|
|
}
|
|
ev, err := c.CoreV1().Events("default").Create(ctx, makeEvent("name"), metav1.CreateOptions{})
|
|
if err != nil {
|
|
t.Errorf("Couldn't create event: %v", err)
|
|
}
|
|
|
|
listOptions := metav1.ListOptions{
|
|
ResourceVersion: "0",
|
|
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
|
|
}
|
|
|
|
// Wait until listing from cache returns resource version of corresponding
|
|
// resources (being the last updates).
|
|
t.Logf("Waiting for configmaps watchcache synced to %s", cm.ResourceVersion)
|
|
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
|
res, err := c.CoreV1().ConfigMaps("default").List(ctx, listOptions)
|
|
if err != nil {
|
|
return false, nil
|
|
}
|
|
return res.ResourceVersion == cm.ResourceVersion, nil
|
|
}); err != nil {
|
|
t.Errorf("Failed to wait for configmaps watchcache synced: %v", err)
|
|
}
|
|
t.Logf("Waiting for events watchcache synced to %s", ev.ResourceVersion)
|
|
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
|
res, err := c.CoreV1().Events("default").List(ctx, listOptions)
|
|
if err != nil {
|
|
return false, nil
|
|
}
|
|
return res.ResourceVersion == ev.ResourceVersion, nil
|
|
}); err != nil {
|
|
t.Errorf("Failed to wait for events watchcache synced: %v", err)
|
|
}
|
|
|
|
// Create a secret, that is stored in the same etcd as configmap, but
|
|
// different than events.
|
|
se, err := c.CoreV1().Secrets("default").Create(ctx, makeSecret("name"), metav1.CreateOptions{})
|
|
if err != nil {
|
|
t.Errorf("Couldn't create secret: %v", err)
|
|
}
|
|
|
|
t.Logf("Waiting for configmaps watchcache synced to %s", se.ResourceVersion)
|
|
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
|
res, err := c.CoreV1().ConfigMaps("default").List(ctx, listOptions)
|
|
if err != nil {
|
|
return false, nil
|
|
}
|
|
return res.ResourceVersion == se.ResourceVersion, nil
|
|
}); err != nil {
|
|
t.Errorf("Failed to wait for configmaps watchcache synced: %v", err)
|
|
}
|
|
t.Logf("Waiting for events watchcache NOT synced to %s", se.ResourceVersion)
|
|
if err := wait.Poll(100*time.Millisecond, 5*time.Second, func() (bool, error) {
|
|
res, err := c.CoreV1().Events("default").List(ctx, listOptions)
|
|
if err != nil {
|
|
return false, nil
|
|
}
|
|
return res.ResourceVersion == se.ResourceVersion, nil
|
|
}); err == nil || err != wait.ErrWaitTimeout {
|
|
t.Errorf("Events watchcache unexpected synced: %v", err)
|
|
}
|
|
}
|
|
|
|
func BenchmarkListFromWatchCache(b *testing.B) {
|
|
c, _, tearDownFn := framework.StartTestServer(b, framework.TestServerSetup{
|
|
ModifyServerConfig: func(config *controlplane.Config) {
|
|
// Switch off endpoints reconciler to avoid unnecessary operations.
|
|
config.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType
|
|
},
|
|
})
|
|
defer tearDownFn()
|
|
|
|
namespaces, secretsPerNamespace := 100, 1000
|
|
wg := sync.WaitGroup{}
|
|
|
|
errCh := make(chan error, namespaces)
|
|
for i := 0; i < namespaces; i++ {
|
|
wg.Add(1)
|
|
index := i
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
ctx := context.Background()
|
|
ns := &v1.Namespace{
|
|
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("namespace-%d", index)},
|
|
}
|
|
ns, err := c.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
|
|
if err != nil {
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
for j := 0; j < secretsPerNamespace; j++ {
|
|
secret := &v1.Secret{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("secret-%d", j),
|
|
},
|
|
}
|
|
_, err := c.CoreV1().Secrets(ns.Name).Create(ctx, secret, metav1.CreateOptions{})
|
|
if err != nil {
|
|
errCh <- err
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errCh)
|
|
for err := range errCh {
|
|
b.Error(err)
|
|
}
|
|
|
|
b.ResetTimer()
|
|
|
|
ctx := context.Background()
|
|
opts := metav1.ListOptions{
|
|
ResourceVersion: "0",
|
|
}
|
|
for i := 0; i < b.N; i++ {
|
|
secrets, err := c.CoreV1().Secrets("").List(ctx, opts)
|
|
if err != nil {
|
|
b.Errorf("failed to list secrets: %v", err)
|
|
}
|
|
b.Logf("Number of secrets: %d", len(secrets.Items))
|
|
}
|
|
}
|