Merge pull request #79527 from wojtek-t/cleanup_etcd_dir_1

Cleanup etcd code
This commit is contained in:
Kubernetes Prow Robot 2019-06-29 07:37:22 -07:00 committed by GitHub
commit 6a2d0f67d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 397 additions and 638 deletions

View File

@ -455,8 +455,6 @@ staging/src/k8s.io/apiserver/pkg/server/httplog
staging/src/k8s.io/apiserver/pkg/server/options
staging/src/k8s.io/apiserver/pkg/server/storage
staging/src/k8s.io/apiserver/pkg/storage
staging/src/k8s.io/apiserver/pkg/storage/etcd/etcdtest
staging/src/k8s.io/apiserver/pkg/storage/etcd/metrics
staging/src/k8s.io/apiserver/pkg/storage/etcd/testing
staging/src/k8s.io/apiserver/pkg/storage/etcd/testing/testingcert
staging/src/k8s.io/apiserver/pkg/storage/etcd/util

View File

@ -20,7 +20,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -25,7 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/etcd3"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
@ -425,7 +425,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
}
var (
versioner = etcd.APIObjectVersioner{}
versioner = etcd3.APIObjectVersioner{}
)
// checkBindings runs through all the PVCs in the Pod and checks:

View File

@ -21,7 +21,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -23,7 +23,7 @@ import (
"time"
"k8s.io/api/core/v1"
storageetcd "k8s.io/apiserver/pkg/storage/etcd"
storageetcd3 "k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/kubernetes/pkg/kubelet/util"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -85,8 +85,8 @@ func isObjectOlder(newObject, oldObject runtime.Object) bool {
if newObject == nil || oldObject == nil {
return false
}
newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newObject)
oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldObject)
newVersion, _ := storageetcd3.Versioner.ObjectResourceVersion(newObject)
oldVersion, _ := storageetcd3.Versioner.ObjectResourceVersion(oldObject)
return newVersion < oldVersion
}

View File

@ -37,7 +37,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/initializer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",

View File

@ -25,7 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1"
)
@ -88,7 +88,7 @@ func (e *quotaAccessor) UpdateQuotaStatus(newQuota *corev1.ResourceQuota) error
return nil
}
var etcdVersioner = etcd.APIObjectVersioner{}
var etcdVersioner = etcd3.APIObjectVersioner{}
// checkCache compares the passed quota against the value in the look-aside cache and returns the newer
// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions

View File

@ -40,8 +40,8 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
@ -81,8 +81,8 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/errors:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:go_default_library",

View File

@ -25,7 +25,7 @@ import (
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
)
@ -55,7 +55,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
cacherConfig := cacherstorage.Config{
CacheCapacity: capacity,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
Versioner: etcd3.APIObjectVersioner{},
ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc,
NewFunc: newFunc,

View File

@ -44,7 +44,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
storeerr "k8s.io/apiserver/pkg/storage/errors"
"k8s.io/apiserver/pkg/storage/etcd/metrics"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/util/dryrun"
"k8s.io/klog"

View File

@ -49,8 +49,8 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
@ -249,7 +249,7 @@ func TestStoreListResourceVersion(t *testing.T) {
t.Fatal(err)
}
versioner := etcdstorage.APIObjectVersioner{}
versioner := etcd3.APIObjectVersioner{}
rev, err := versioner.ObjectResourceVersion(obj)
if err != nil {
t.Fatal(err)
@ -1553,7 +1553,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
config := cacherstorage.Config{
CacheCapacity: 10,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
Versioner: etcd3.APIObjectVersioner{},
ResourcePrefix: podPrefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
GetAttrsFunc: getPodAttrs,

View File

@ -25,7 +25,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics:go_default_library",
"//vendor/github.com/emicklei/go-restful:go_default_library",
"//vendor/github.com/go-openapi/spec:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",

View File

@ -22,7 +22,7 @@ import (
apimetrics "k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/server/mux"
etcdmetrics "k8s.io/apiserver/pkg/storage/etcd/metrics"
etcd3metrics "k8s.io/apiserver/pkg/storage/etcd3/metrics"
"github.com/prometheus/client_golang/prometheus"
)
@ -47,7 +47,7 @@ func (m MetricsWithReset) Install(c *mux.PathRecorderMux) {
c.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
if req.Method == "DELETE" {
apimetrics.Reset()
etcdmetrics.Reset()
etcd3metrics.Reset()
io.WriteString(w, "metrics reset\n")
return
}
@ -58,5 +58,5 @@ func (m MetricsWithReset) Install(c *mux.PathRecorderMux) {
// register apiserver and etcd metrics
func register() {
apimetrics.Register()
etcdmetrics.Register()
etcd3metrics.Register()
}

View File

@ -61,7 +61,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",

View File

@ -34,7 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/client-go/tools/cache"
)
@ -75,7 +75,7 @@ func newTestWatchCache(capacity int) *watchCache {
}
return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil
}
versioner := etcd.APIObjectVersioner{}
versioner := etcd3.APIObjectVersioner{}
mockHandler := func(*watchCacheEvent) {}
wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner)
wc.clock = clock.NewFakeClock(time.Now())

View File

@ -1,36 +1,12 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["api_object_versioner_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
],
)
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"api_object_versioner.go",
"doc.go",
],
srcs = ["doc.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd",
importpath = "k8s.io/apiserver/pkg/storage/etcd",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
],
)
filegroup(
@ -44,8 +20,6 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/etcdtest:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/metrics:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/util:all-srcs",
],

View File

@ -1,29 +0,0 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"etcdtest.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd/etcdtest",
importpath = "k8s.io/apiserver/pkg/storage/etcd/etcdtest",
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,17 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcdtest // import "k8s.io/apiserver/pkg/storage/etcd/etcdtest"

View File

@ -1,36 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcdtest
import (
"os"
"path"
)
// Returns the prefix set via the ETCD_PREFIX environment variable (if any).
func PathPrefix() string {
pref := os.Getenv("ETCD_PREFIX")
if pref == "" {
pref = "registry"
}
return path.Join("/", pref)
}
// Adds the ETCD_PREFIX to the provided key
func AddPrefix(in string) string {
return path.Join(PathPrefix(), in)
}

View File

@ -7,12 +7,14 @@ load(
go_library(
name = "go_default_library",
srcs = ["utils.go"],
srcs = [
"test_server.go",
"utils.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd/testing",
importpath = "k8s.io/apiserver/pkg/storage/etcd/testing",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/etcdtest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing/testingcert:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/github.com/coreos/etcd/client:go_default_library",

View File

@ -0,0 +1,303 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"os"
"path"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/etcd/testing/testingcert"
"k8s.io/apiserver/pkg/storage/storagebackend"
"context"
etcd "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
"github.com/coreos/etcd/etcdserver/api/v2http"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"k8s.io/klog"
)
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
type EtcdTestServer struct {
// The following are lumped etcd2 test server params
// TODO: Deprecate in a post 1.5 release
etcdserver.ServerConfig
PeerListeners, ClientListeners []net.Listener
Client etcd.Client
CertificatesDir string
CertFile string
KeyFile string
CAFile string
raftHandler http.Handler
s *etcdserver.EtcdServer
hss []*httptest.Server
// The following are lumped etcd3 test server params
v3Cluster *integration.ClusterV3
V3Client *clientv3.Client
}
// newLocalListener opens a port localhost using any port
func newLocalListener(t *testing.T) net.Listener {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
return l
}
// newSecuredLocalListener opens a port localhost using any port
// with SSL enable
func newSecuredLocalListener(t *testing.T, certFile, keyFile, caFile string) net.Listener {
var l net.Listener
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
tlsInfo := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
CAFile: caFile,
}
tlscfg, err := tlsInfo.ServerConfig()
if err != nil {
t.Fatalf("unexpected serverConfig error: %v", err)
}
l, err = transport.NewKeepAliveListener(l, "https", tlscfg)
if err != nil {
t.Fatal(err)
}
return l
}
func newHttpTransport(t *testing.T, certFile, keyFile, caFile string) etcd.CancelableTransport {
tlsInfo := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
CAFile: caFile,
}
tr, err := transport.NewTransport(tlsInfo, time.Second)
if err != nil {
t.Fatal(err)
}
return tr
}
// configureTestCluster will set the params to start an etcd server
func configureTestCluster(t *testing.T, name string, https bool) *EtcdTestServer {
var err error
m := &EtcdTestServer{}
pln := newLocalListener(t)
m.PeerListeners = []net.Listener{pln}
m.PeerURLs, err = types.NewURLs([]string{"http://" + pln.Addr().String()})
if err != nil {
t.Fatal(err)
}
// Allow test launches to control where etcd data goes, for space or performance reasons
baseDir := os.Getenv("TEST_ETCD_DIR")
if len(baseDir) == 0 {
baseDir = os.TempDir()
}
if https {
m.CertificatesDir, err = ioutil.TempDir(baseDir, "etcd_certificates")
if err != nil {
t.Fatal(err)
}
m.CertFile = path.Join(m.CertificatesDir, "etcdcert.pem")
if err = ioutil.WriteFile(m.CertFile, []byte(testingcert.CertFileContent), 0644); err != nil {
t.Fatal(err)
}
m.KeyFile = path.Join(m.CertificatesDir, "etcdkey.pem")
if err = ioutil.WriteFile(m.KeyFile, []byte(testingcert.KeyFileContent), 0644); err != nil {
t.Fatal(err)
}
m.CAFile = path.Join(m.CertificatesDir, "ca.pem")
if err = ioutil.WriteFile(m.CAFile, []byte(testingcert.CAFileContent), 0644); err != nil {
t.Fatal(err)
}
cln := newSecuredLocalListener(t, m.CertFile, m.KeyFile, m.CAFile)
m.ClientListeners = []net.Listener{cln}
m.ClientURLs, err = types.NewURLs([]string{"https://" + cln.Addr().String()})
if err != nil {
t.Fatal(err)
}
} else {
cln := newLocalListener(t)
m.ClientListeners = []net.Listener{cln}
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
if err != nil {
t.Fatal(err)
}
}
m.AuthToken = "simple"
m.Name = name
m.DataDir, err = ioutil.TempDir(baseDir, "etcd")
if err != nil {
t.Fatal(err)
}
clusterStr := fmt.Sprintf("%s=http://%s", name, pln.Addr().String())
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
if err != nil {
t.Fatal(err)
}
m.InitialClusterToken = "TestEtcd"
m.NewCluster = true
m.ForceNewCluster = false
m.ElectionTicks = 10
m.TickMs = uint(10)
return m
}
// launch will attempt to start the etcd server
func (m *EtcdTestServer) launch(t *testing.T) error {
var err error
if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
return fmt.Errorf("failed to initialize the etcd server: %v", err)
}
m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
for _, ln := range m.PeerListeners {
hs := &httptest.Server{
Listener: ln,
Config: &http.Server{Handler: m.raftHandler},
}
hs.Start()
m.hss = append(m.hss, hs)
}
for _, ln := range m.ClientListeners {
hs := &httptest.Server{
Listener: ln,
Config: &http.Server{Handler: v2http.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
}
hs.Start()
m.hss = append(m.hss, hs)
}
return nil
}
// waitForEtcd wait until etcd is propagated correctly
func (m *EtcdTestServer) waitUntilUp() error {
membersAPI := etcd.NewMembersAPI(m.Client)
for start := time.Now(); time.Since(start) < wait.ForeverTestTimeout; time.Sleep(10 * time.Millisecond) {
members, err := membersAPI.List(context.TODO())
if err != nil {
klog.Errorf("Error when getting etcd cluster members")
continue
}
if len(members) == 1 && len(members[0].ClientURLs) > 0 {
return nil
}
}
return fmt.Errorf("timeout on waiting for etcd cluster")
}
// Terminate will shutdown the running etcd server
func (m *EtcdTestServer) Terminate(t *testing.T) {
if m.v3Cluster != nil {
m.v3Cluster.Terminate(t)
} else {
m.Client = nil
m.s.Stop()
// TODO: This is a pretty ugly hack to workaround races during closing
// in-memory etcd server in unit tests - see #18928 for more details.
// We should get rid of it as soon as we have a proper fix - etcd clients
// have overwritten transport counting opened connections (probably by
// overwriting Dial function) and termination function waiting for all
// connections to be closed and stopping accepting new ones.
time.Sleep(250 * time.Millisecond)
for _, hs := range m.hss {
hs.CloseClientConnections()
hs.Close()
}
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
}
if len(m.CertificatesDir) > 0 {
if err := os.RemoveAll(m.CertificatesDir); err != nil {
t.Fatal(err)
}
}
}
}
// NewEtcdTestClientServer DEPRECATED creates a new client and server for testing
func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
server := configureTestCluster(t, "foo", true)
err := server.launch(t)
if err != nil {
t.Fatalf("Failed to start etcd server error=%v", err)
return nil
}
cfg := etcd.Config{
Endpoints: server.ClientURLs.StringSlice(),
Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile),
}
server.Client, err = etcd.New(cfg)
if err != nil {
server.Terminate(t)
t.Fatalf("Unexpected error in NewEtcdTestClientServer (%v)", err)
return nil
}
if err := server.waitUntilUp(); err != nil {
server.Terminate(t)
t.Fatalf("Unexpected error in waitUntilUp (%v)", err)
return nil
}
return server
}
// NewEtcd3TestClientServer creates a new client and server for testing
func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) {
server := &EtcdTestServer{
v3Cluster: integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}),
}
server.V3Client = server.v3Cluster.RandClient()
config := &storagebackend.Config{
Type: "etcd3",
Prefix: PathPrefix(),
Transport: storagebackend.TransportConfig{
ServerList: server.V3Client.Endpoints(),
},
Paging: true,
}
return server, config
}

View File

@ -17,288 +17,20 @@ limitations under the License.
package testing
import (
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"os"
"path"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/etcd/etcdtest"
"k8s.io/apiserver/pkg/storage/etcd/testing/testingcert"
"k8s.io/apiserver/pkg/storage/storagebackend"
"context"
etcd "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
"github.com/coreos/etcd/etcdserver/api/v2http"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"k8s.io/klog"
)
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
type EtcdTestServer struct {
// The following are lumped etcd2 test server params
// TODO: Deprecate in a post 1.5 release
etcdserver.ServerConfig
PeerListeners, ClientListeners []net.Listener
Client etcd.Client
CertificatesDir string
CertFile string
KeyFile string
CAFile string
raftHandler http.Handler
s *etcdserver.EtcdServer
hss []*httptest.Server
// The following are lumped etcd3 test server params
v3Cluster *integration.ClusterV3
V3Client *clientv3.Client
// Returns the prefix set via the ETCD_PREFIX environment variable (if any).
func PathPrefix() string {
pref := os.Getenv("ETCD_PREFIX")
if pref == "" {
pref = "registry"
}
return path.Join("/", pref)
}
// newLocalListener opens a port localhost using any port
func newLocalListener(t *testing.T) net.Listener {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
return l
}
// newSecuredLocalListener opens a port localhost using any port
// with SSL enable
func newSecuredLocalListener(t *testing.T, certFile, keyFile, caFile string) net.Listener {
var l net.Listener
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
tlsInfo := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
CAFile: caFile,
}
tlscfg, err := tlsInfo.ServerConfig()
if err != nil {
t.Fatalf("unexpected serverConfig error: %v", err)
}
l, err = transport.NewKeepAliveListener(l, "https", tlscfg)
if err != nil {
t.Fatal(err)
}
return l
}
func newHttpTransport(t *testing.T, certFile, keyFile, caFile string) etcd.CancelableTransport {
tlsInfo := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
CAFile: caFile,
}
tr, err := transport.NewTransport(tlsInfo, time.Second)
if err != nil {
t.Fatal(err)
}
return tr
}
// configureTestCluster will set the params to start an etcd server
func configureTestCluster(t *testing.T, name string, https bool) *EtcdTestServer {
var err error
m := &EtcdTestServer{}
pln := newLocalListener(t)
m.PeerListeners = []net.Listener{pln}
m.PeerURLs, err = types.NewURLs([]string{"http://" + pln.Addr().String()})
if err != nil {
t.Fatal(err)
}
// Allow test launches to control where etcd data goes, for space or performance reasons
baseDir := os.Getenv("TEST_ETCD_DIR")
if len(baseDir) == 0 {
baseDir = os.TempDir()
}
if https {
m.CertificatesDir, err = ioutil.TempDir(baseDir, "etcd_certificates")
if err != nil {
t.Fatal(err)
}
m.CertFile = path.Join(m.CertificatesDir, "etcdcert.pem")
if err = ioutil.WriteFile(m.CertFile, []byte(testingcert.CertFileContent), 0644); err != nil {
t.Fatal(err)
}
m.KeyFile = path.Join(m.CertificatesDir, "etcdkey.pem")
if err = ioutil.WriteFile(m.KeyFile, []byte(testingcert.KeyFileContent), 0644); err != nil {
t.Fatal(err)
}
m.CAFile = path.Join(m.CertificatesDir, "ca.pem")
if err = ioutil.WriteFile(m.CAFile, []byte(testingcert.CAFileContent), 0644); err != nil {
t.Fatal(err)
}
cln := newSecuredLocalListener(t, m.CertFile, m.KeyFile, m.CAFile)
m.ClientListeners = []net.Listener{cln}
m.ClientURLs, err = types.NewURLs([]string{"https://" + cln.Addr().String()})
if err != nil {
t.Fatal(err)
}
} else {
cln := newLocalListener(t)
m.ClientListeners = []net.Listener{cln}
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
if err != nil {
t.Fatal(err)
}
}
m.AuthToken = "simple"
m.Name = name
m.DataDir, err = ioutil.TempDir(baseDir, "etcd")
if err != nil {
t.Fatal(err)
}
clusterStr := fmt.Sprintf("%s=http://%s", name, pln.Addr().String())
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
if err != nil {
t.Fatal(err)
}
m.InitialClusterToken = "TestEtcd"
m.NewCluster = true
m.ForceNewCluster = false
m.ElectionTicks = 10
m.TickMs = uint(10)
return m
}
// launch will attempt to start the etcd server
func (m *EtcdTestServer) launch(t *testing.T) error {
var err error
if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
return fmt.Errorf("failed to initialize the etcd server: %v", err)
}
m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
for _, ln := range m.PeerListeners {
hs := &httptest.Server{
Listener: ln,
Config: &http.Server{Handler: m.raftHandler},
}
hs.Start()
m.hss = append(m.hss, hs)
}
for _, ln := range m.ClientListeners {
hs := &httptest.Server{
Listener: ln,
Config: &http.Server{Handler: v2http.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
}
hs.Start()
m.hss = append(m.hss, hs)
}
return nil
}
// waitForEtcd wait until etcd is propagated correctly
func (m *EtcdTestServer) waitUntilUp() error {
membersAPI := etcd.NewMembersAPI(m.Client)
for start := time.Now(); time.Since(start) < wait.ForeverTestTimeout; time.Sleep(10 * time.Millisecond) {
members, err := membersAPI.List(context.TODO())
if err != nil {
klog.Errorf("Error when getting etcd cluster members")
continue
}
if len(members) == 1 && len(members[0].ClientURLs) > 0 {
return nil
}
}
return fmt.Errorf("timeout on waiting for etcd cluster")
}
// Terminate will shutdown the running etcd server
func (m *EtcdTestServer) Terminate(t *testing.T) {
if m.v3Cluster != nil {
m.v3Cluster.Terminate(t)
} else {
m.Client = nil
m.s.Stop()
// TODO: This is a pretty ugly hack to workaround races during closing
// in-memory etcd server in unit tests - see #18928 for more details.
// We should get rid of it as soon as we have a proper fix - etcd clients
// have overwritten transport counting opened connections (probably by
// overwriting Dial function) and termination function waiting for all
// connections to be closed and stopping accepting new ones.
time.Sleep(250 * time.Millisecond)
for _, hs := range m.hss {
hs.CloseClientConnections()
hs.Close()
}
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
}
if len(m.CertificatesDir) > 0 {
if err := os.RemoveAll(m.CertificatesDir); err != nil {
t.Fatal(err)
}
}
}
}
// NewEtcdTestClientServer DEPRECATED creates a new client and server for testing
func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
server := configureTestCluster(t, "foo", true)
err := server.launch(t)
if err != nil {
t.Fatalf("Failed to start etcd server error=%v", err)
return nil
}
cfg := etcd.Config{
Endpoints: server.ClientURLs.StringSlice(),
Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile),
}
server.Client, err = etcd.New(cfg)
if err != nil {
server.Terminate(t)
t.Fatalf("Unexpected error in NewEtcdTestClientServer (%v)", err)
return nil
}
if err := server.waitUntilUp(); err != nil {
server.Terminate(t)
t.Fatalf("Unexpected error in waitUntilUp (%v)", err)
return nil
}
return server
}
// NewEtcd3TestClientServer creates a new client and server for testing
func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) {
server := &EtcdTestServer{
v3Cluster: integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}),
}
server.V3Client = server.v3Cluster.RandClient()
config := &storagebackend.Config{
Type: "etcd3",
Prefix: etcdtest.PathPrefix(),
Transport: storagebackend.TransportConfig{
ServerList: server.V3Client.Endpoints(),
},
Paging: true,
}
return server, config
// Adds the ETCD_PREFIX to the provided key
func AddPrefix(in string) string {
return path.Join(PathPrefix(), in)
}

View File

@ -10,10 +10,6 @@ go_test(
name = "go_default_test",
srcs = ["etcd_util_test.go"],
embed = [":go_default_library"],
deps = [
"//vendor/github.com/coreos/etcd/client:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)
go_library(
@ -24,7 +20,6 @@ go_library(
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd/util",
importpath = "k8s.io/apiserver/pkg/storage/etcd/util",
deps = ["//vendor/github.com/coreos/etcd/client:go_default_library"],
)
filegroup(

View File

@ -19,69 +19,8 @@ package util
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
etcd "github.com/coreos/etcd/client"
)
// IsEtcdNotFound returns true if and only if err is an etcd not found error.
func IsEtcdNotFound(err error) bool {
return isEtcdErrorNum(err, etcd.ErrorCodeKeyNotFound)
}
// IsEtcdNodeExist returns true if and only if err is an etcd node already exist error.
func IsEtcdNodeExist(err error) bool {
return isEtcdErrorNum(err, etcd.ErrorCodeNodeExist)
}
// IsEtcdTestFailed returns true if and only if err is an etcd write conflict.
func IsEtcdTestFailed(err error) bool {
return isEtcdErrorNum(err, etcd.ErrorCodeTestFailed)
}
// IsEtcdWatchExpired returns true if and only if err indicates the watch has expired.
func IsEtcdWatchExpired(err error) bool {
// NOTE: This seems weird why it wouldn't be etcd.ErrorCodeWatcherCleared
// I'm using the previous matching value
return isEtcdErrorNum(err, etcd.ErrorCodeEventIndexCleared)
}
// IsEtcdUnreachable returns true if and only if err indicates the server could not be reached.
func IsEtcdUnreachable(err error) bool {
// NOTE: The logic has changed previous error code no longer applies
return err == etcd.ErrClusterUnavailable
}
// isEtcdErrorNum returns true if and only if err is an etcd error, whose errorCode matches errorCode
func isEtcdErrorNum(err error, errorCode int) bool {
if err != nil {
if etcdError, ok := err.(etcd.Error); ok {
return etcdError.Code == errorCode
}
// NOTE: There are other error types returned
}
return false
}
// GetEtcdVersion performs a version check against the provided Etcd server,
// returning the string response, and error (if any).
func GetEtcdVersion(host string) (string, error) {
response, err := http.Get(host + "/version")
if err != nil {
return "", err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return "", fmt.Errorf("unsuccessful response from etcd server %q: %v", host, err)
}
versionBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(versionBytes), nil
}
type etcdHealth struct {
// Note this has to be public so the json library can modify it.
Health string `json:"health"`

View File

@ -17,79 +17,9 @@ limitations under the License.
package util
import (
"fmt"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"
etcd "github.com/coreos/etcd/client"
"github.com/stretchr/testify/assert"
)
const validEtcdVersion = "etcd 2.0.9"
func TestIsEtcdNotFound(t *testing.T) {
try := func(err error, isNotFound bool) {
if IsEtcdNotFound(err) != isNotFound {
t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound)
}
}
try(&etcd.Error{Code: 101}, false)
try(nil, false)
try(fmt.Errorf("some other kind of error"), false)
}
func TestGetEtcdVersion_ValidVersion(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, validEtcdVersion)
}))
defer testServer.Close()
var version string
var err error
if version, err = GetEtcdVersion(testServer.URL); err != nil {
t.Errorf("Unexpected error: %v", err)
}
assert.Equal(t, validEtcdVersion, version, "Unexpected version")
assert.Nil(t, err)
}
func TestGetEtcdVersion_ErrorStatus(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "", http.StatusServiceUnavailable)
}))
defer testServer.Close()
_, err := GetEtcdVersion(testServer.URL)
assert.NotNil(t, err)
}
func TestGetEtcdVersion_NotListening(t *testing.T) {
portIsOpen := func(port int) bool {
conn, err := net.DialTimeout("tcp", "127.0.0.1:"+strconv.Itoa(port), 1*time.Second)
if err == nil {
conn.Close()
return true
}
return false
}
port := rand.Intn((1 << 16) - 1)
for tried := 0; portIsOpen(port); tried++ {
if tried >= 10 {
t.Fatal("Couldn't find a closed TCP port to continue testing")
}
port++
}
_, err := GetEtcdVersion("http://127.0.0.1:" + strconv.Itoa(port))
assert.NotNil(t, err)
}
func TestEtcdHealthCheck(t *testing.T) {
tests := []struct {
data string

View File

@ -9,6 +9,7 @@ load(
go_test(
name = "go_default_test",
srcs = [
"api_object_versioner_test.go",
"compact_test.go",
"event_test.go",
"lease_manager_test.go",
@ -33,8 +34,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/tests:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
@ -52,6 +52,7 @@ go_test(
go_library(
name = "go_default_library",
srcs = [
"api_object_versioner.go",
"compact.go",
"errors.go",
"event.go",
@ -68,11 +69,11 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
@ -94,6 +95,7 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/preflight:all-srcs",
],
tags = ["automanaged"],

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
package etcd3
import (
"strconv"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
package etcd3
import (
"testing"

View File

@ -8,8 +8,8 @@ load(
go_library(
name = "go_default_library",
srcs = ["metrics.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd/metrics",
importpath = "k8s.io/apiserver/pkg/storage/etcd/metrics",
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/metrics",
importpath = "k8s.io/apiserver/pkg/storage/etcd3/metrics",
deps = ["//vendor/github.com/prometheus/client_golang/prometheus:go_default_library"],
)

View File

@ -62,15 +62,18 @@ func Register() {
})
}
// UpdateObjectCount sets the etcd_object_counts metric.
func UpdateObjectCount(resourcePrefix string, count int64) {
objectCounts.WithLabelValues(resourcePrefix).Set(float64(count))
}
// RecordEtcdRequestLatency sets the etcd_request_duration_seconds metrics.
func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime))
deprecatedEtcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(sinceInMicroseconds(startTime))
}
// Reset resets the etcd_request_duration_seconds metric.
func Reset() {
etcdRequestLatency.Reset()

View File

@ -38,8 +38,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/etcd/metrics"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utiltrace "k8s.io/utils/trace"
@ -89,7 +88,7 @@ func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer val
}
func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := etcd.APIObjectVersioner{}
versioner := APIObjectVersioner{}
result := &store{
client: c,
codec: codec,

View File

@ -46,8 +46,7 @@ import (
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd"
storagetests "k8s.io/apiserver/pkg/storage/tests"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
@ -693,13 +692,13 @@ func TestTransformationFailure(t *testing.T) {
key: "/one-level/test",
obj: &example.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: storagetests.DeepEqualSafePodSpec(),
Spec: storagetesting.DeepEqualSafePodSpec(),
},
}, {
key: "/two-level/1/test",
obj: &example.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: storagetests.DeepEqualSafePodSpec(),
Spec: storagetesting.DeepEqualSafePodSpec(),
},
}}
for i, ps := range preset[:1] {
@ -1281,7 +1280,7 @@ func TestListInconsistentContinuation(t *testing.T) {
}
// compact to latest revision.
versioner := etcd.APIObjectVersioner{}
versioner := APIObjectVersioner{}
lastRVString := preset[2].storedObj.ResourceVersion
lastRV, err := versioner.ParseResourceVersion(lastRVString)
if err != nil {

View File

@ -19,6 +19,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
],
)

View File

@ -22,15 +22,16 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/storage"
)
// CreateObj will create a single object using the storage interface
// CreateObj will create a single object using the storage interface.
func CreateObj(helper storage.Interface, name string, obj, out runtime.Object, ttl uint64) error {
return helper.Create(context.TODO(), name, obj, out, ttl)
}
//CreateObjList will create a list from the array of objects
// CreateObjList will create a list from the array of objects.
func CreateObjList(prefix string, helper storage.Interface, items []runtime.Object) error {
for i := range items {
obj := items[i]
@ -47,7 +48,7 @@ func CreateObjList(prefix string, helper storage.Interface, items []runtime.Obje
return nil
}
// CreateList will properly create a list using the storage interface
// CreateList will properly create a list using the storage interface.
func CreateList(prefix string, helper storage.Interface, list runtime.Object) error {
items, err := meta.ExtractList(list)
if err != nil {
@ -59,3 +60,13 @@ func CreateList(prefix string, helper storage.Interface, list runtime.Object) er
}
return meta.SetList(list, items)
}
// DeepEqualSafePodSpec returns an example.PodSpec safe for deep-equal operations.
func DeepEqualSafePodSpec() example.PodSpec {
grace := int64(30)
return example.PodSpec{
RestartPolicy: "Always",
TerminationGracePeriodSeconds: &grace,
SchedulerName: "default-scheduler",
}
}

View File

@ -1,15 +1,10 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
load("@io_bazel_rules_go//go:def.bzl", "go_test")
go_test(
name = "go_default_test",
srcs = ["cacher_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/apitesting:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
@ -29,24 +24,15 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/etcdtest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = ["utils.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/tests",
importpath = "k8s.io/apiserver/pkg/storage/tests",
deps = ["//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),

View File

@ -43,10 +43,9 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/apiserver/pkg/storage/etcd3"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
@ -102,7 +101,7 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServe
func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner) {
prefix := "pods"
v := etcdstorage.APIObjectVersioner{}
v := etcd3.APIObjectVersioner{}
config := cacherstorage.Config{
CacheCapacity: cap,
Storage: s,
@ -120,7 +119,7 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage
func makeTestPod(name string) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
Spec: DeepEqualSafePodSpec(),
Spec: storagetesting.DeepEqualSafePodSpec(),
}
}
@ -147,7 +146,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl
}
func TestGet(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
defer server.Terminate(t)
cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -178,7 +177,7 @@ func TestGet(t *testing.T) {
}
func TestGetToList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
defer server.Terminate(t)
cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -234,7 +233,7 @@ func TestGetToList(t *testing.T) {
}
func TestList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
defer server.Terminate(t)
cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -315,7 +314,7 @@ func TestList(t *testing.T) {
}
func TestInfiniteList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
defer server.Terminate(t)
cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -369,7 +368,7 @@ func (self *injectListError) List(ctx context.Context, key string, resourceVersi
}
func TestWatch(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
// Inject one list error to make sure we test the relist case.
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
defer server.Terminate(t)
@ -446,7 +445,7 @@ func TestWatch(t *testing.T) {
}
func TestWatcherTimeout(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
defer server.Terminate(t)
cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -488,7 +487,7 @@ func TestWatcherTimeout(t *testing.T) {
}
func TestFiltering(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
defer server.Terminate(t)
cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -550,7 +549,7 @@ func TestFiltering(t *testing.T) {
}
func TestStartingResourceVersion(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
defer server.Terminate(t)
cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -598,7 +597,7 @@ func TestStartingResourceVersion(t *testing.T) {
}
func TestEmptyWatchEventCache(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
defer server.Terminate(t)
// add a few objects
@ -662,7 +661,7 @@ func TestEmptyWatchEventCache(t *testing.T) {
}
func TestRandomWatchDeliver(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
defer server.Terminate(t)
cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -788,7 +787,7 @@ func TestCacherListerWatcherPagination(t *testing.T) {
func TestWatchDispatchBookmarkEvents(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
defer server.Terminate(t)
cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -850,7 +849,7 @@ func TestWatchDispatchBookmarkEvents(t *testing.T) {
func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
defer server.Terminate(t)
cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop()

View File

@ -1,30 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tests
import (
"k8s.io/apiserver/pkg/apis/example"
)
func DeepEqualSafePodSpec() example.PodSpec {
grace := int64(30)
return example.PodSpec{
RestartPolicy: "Always",
TerminationGracePeriodSeconds: &grace,
SchedulerName: "default-scheduler",
}
}

4
vendor/modules.txt vendored
View File

@ -1260,13 +1260,11 @@ k8s.io/apiserver/pkg/server/storage
k8s.io/apiserver/pkg/storage
k8s.io/apiserver/pkg/storage/cacher
k8s.io/apiserver/pkg/storage/errors
k8s.io/apiserver/pkg/storage/etcd
k8s.io/apiserver/pkg/storage/etcd/etcdtest
k8s.io/apiserver/pkg/storage/etcd/metrics
k8s.io/apiserver/pkg/storage/etcd/testing
k8s.io/apiserver/pkg/storage/etcd/testing/testingcert
k8s.io/apiserver/pkg/storage/etcd/util
k8s.io/apiserver/pkg/storage/etcd3
k8s.io/apiserver/pkg/storage/etcd3/metrics
k8s.io/apiserver/pkg/storage/etcd3/preflight
k8s.io/apiserver/pkg/storage/names
k8s.io/apiserver/pkg/storage/storagebackend