test/integration: add StartEtcd

In contrast to EtcdMain, it can be called by individual tests or benchmarks and
each caller will get a fresh etcd instance. However, it uses the same
underlying code and the same port for all instances, so tests cannot run in
parallel.
This commit is contained in:
Patrick Ohly 2023-01-31 10:22:13 +01:00
parent 12ceec47aa
commit c008732948
3 changed files with 69 additions and 24 deletions

View File

@ -37,12 +37,12 @@ import (
// with one of them containing events and the other all other objects. // with one of them containing events and the other all other objects.
func multiEtcdSetup(t *testing.T) (clientset.Interface, framework.TearDownFunc) { func multiEtcdSetup(t *testing.T) (clientset.Interface, framework.TearDownFunc) {
etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"} etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"}
etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs) etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs, nil)
if err != nil { if err != nil {
t.Fatalf("Couldn't start etcd: %v", err) t.Fatalf("Couldn't start etcd: %v", err)
} }
etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs) etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs, nil)
if err != nil { if err != nil {
t.Fatalf("Couldn't start etcd: %v", err) t.Fatalf("Couldn't start etcd: %v", err)
} }

View File

@ -26,6 +26,7 @@ import (
"os/exec" "os/exec"
"strings" "strings"
"syscall" "syscall"
"testing"
"time" "time"
"go.uber.org/goleak" "go.uber.org/goleak"
@ -62,7 +63,7 @@ func getAvailablePort() (int, error) {
// startEtcd executes an etcd instance. The returned function will signal the // startEtcd executes an etcd instance. The returned function will signal the
// etcd process and wait for it to exit. // etcd process and wait for it to exit.
func startEtcd() (func(), error) { func startEtcd(output io.Writer) (func(), error) {
etcdURL := env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379") etcdURL := env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379")
conn, err := net.Dial("tcp", strings.TrimPrefix(etcdURL, "http://")) conn, err := net.Dial("tcp", strings.TrimPrefix(etcdURL, "http://"))
if err == nil { if err == nil {
@ -72,7 +73,7 @@ func startEtcd() (func(), error) {
} }
klog.V(1).Infof("could not connect to etcd: %v", err) klog.V(1).Infof("could not connect to etcd: %v", err)
currentURL, stop, err := RunCustomEtcd("integration_test_etcd_data", nil) currentURL, stop, err := RunCustomEtcd("integration_test_etcd_data", nil, output)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -83,7 +84,7 @@ func startEtcd() (func(), error) {
} }
// RunCustomEtcd starts a custom etcd instance for test purposes. // RunCustomEtcd starts a custom etcd instance for test purposes.
func RunCustomEtcd(dataDir string, customFlags []string) (url string, stopFn func(), err error) { func RunCustomEtcd(dataDir string, customFlags []string, output io.Writer) (url string, stopFn func(), err error) {
// TODO: Check for valid etcd version. // TODO: Check for valid etcd version.
etcdPath, err := getEtcdPath() etcdPath, err := getEtcdPath()
if err != nil { if err != nil {
@ -119,8 +120,13 @@ func RunCustomEtcd(dataDir string, customFlags []string) (url string, stopFn fun
} }
args = append(args, customFlags...) args = append(args, customFlags...)
cmd := exec.CommandContext(ctx, etcdPath, args...) cmd := exec.CommandContext(ctx, etcdPath, args...)
cmd.Stdout = os.Stdout if output == nil {
cmd.Stderr = os.Stderr cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
} else {
cmd.Stdout = output
cmd.Stderr = output
}
stop := func() { stop := func() {
// try to exit etcd gracefully // try to exit etcd gracefully
defer cancel() defer cancel()
@ -194,7 +200,7 @@ func EtcdMain(tests func() int) {
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
) )
stop, err := startEtcd() stop, err := startEtcd(nil)
if err != nil { if err != nil {
klog.Fatalf("cannot run integration tests: unable to start etcd: %v", err) klog.Fatalf("cannot run integration tests: unable to start etcd: %v", err)
} }
@ -202,27 +208,32 @@ func EtcdMain(tests func() int) {
stop() // Don't defer this. See os.Exit documentation. stop() // Don't defer this. See os.Exit documentation.
klog.StopFlushDaemon() klog.StopFlushDaemon()
// Several tests don't wait for goroutines to stop. goleak.Find retries if err := goleakFindRetry(goleakOpts...); err != nil {
// internally, but not long enough. 5 seconds seemed to be enough for klog.ErrorS(err, "EtcdMain goroutine check")
// most tests, even when testing in the CI. result = 1
timeout := 5 * time.Second
start := time.Now()
for {
err := goleak.Find(goleakOpts...)
if err == nil {
break
}
if time.Now().Sub(start) >= timeout {
klog.ErrorS(err, "EtcdMain goroutine check")
result = 1
break
}
} }
os.Exit(result) os.Exit(result)
} }
// GetEtcdURL returns the URL of the etcd instance started by EtcdMain. // GetEtcdURL returns the URL of the etcd instance started by EtcdMain or StartEtcd.
func GetEtcdURL() string { func GetEtcdURL() string {
return env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379") return env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379")
} }
// StartEtcd starts an etcd instance inside a test. It will abort the test if
// startup fails and clean up after the test automatically. Stdout and stderr
// of the etcd binary go to the provided writer.
//
// In contrast to EtcdMain, StartEtcd will not do automatic leak checking.
// Tests can decide if and where they want to do that.
//
// Starting etcd multiple times per test run instead of once with EtcdMain
// provides better separation between different tests.
func StartEtcd(tb testing.TB, etcdOutput io.Writer) {
stop, err := startEtcd(etcdOutput)
if err != nil {
tb.Fatalf("unable to start etcd: %v", err)
}
tb.Cleanup(stop)
}

View File

@ -17,6 +17,9 @@ limitations under the License.
package framework package framework
import ( import (
"testing"
"time"
"go.uber.org/goleak" "go.uber.org/goleak"
"k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/healthz"
) )
@ -34,3 +37,34 @@ func IgnoreBackgroundGoroutines() []goleak.Option {
return []goleak.Option{goleak.IgnoreCurrent()} return []goleak.Option{goleak.IgnoreCurrent()}
} }
// GoleakCheck sets up leak checking for a test or benchmark.
// The check runs as cleanup operation and records an
// error when goroutines were leaked.
func GoleakCheck(tb testing.TB, opts ...goleak.Option) {
// Must be called *before* creating new goroutines.
opts = append(opts, IgnoreBackgroundGoroutines()...)
tb.Cleanup(func() {
if err := goleakFindRetry(opts...); err != nil {
tb.Error(err.Error())
}
})
}
func goleakFindRetry(opts ...goleak.Option) error {
// Several tests don't wait for goroutines to stop. goleak.Find retries
// internally, but not long enough. 5 seconds seemed to be enough for
// most tests, even when testing in the CI.
timeout := 5 * time.Second
start := time.Now()
for {
err := goleak.Find(opts...)
if err == nil {
return nil
}
if time.Now().Sub(start) >= timeout {
return err
}
}
}