From cb87793d57f59b9ead730fb1e4ed6cfa38c189f2 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Wed, 8 Apr 2020 15:46:07 +0200 Subject: [PATCH 1/2] Add unit test to portallocator storage Add unit test for the portallocator storage based on the ipallocator ones. pkg/registry/core/service/ipallocator/storage/storage_test.go --- pkg/registry/core/service/portallocator/BUILD | 1 + .../core/service/portallocator/storage/BUILD | 45 +++++ .../service/portallocator/storage/storage.go | 19 ++ .../portallocator/storage/storage_test.go | 184 ++++++++++++++++++ 4 files changed, 249 insertions(+) create mode 100644 pkg/registry/core/service/portallocator/storage/BUILD create mode 100644 pkg/registry/core/service/portallocator/storage/storage.go create mode 100644 pkg/registry/core/service/portallocator/storage/storage_test.go diff --git a/pkg/registry/core/service/portallocator/BUILD b/pkg/registry/core/service/portallocator/BUILD index f2251bf9c83..25822789099 100644 --- a/pkg/registry/core/service/portallocator/BUILD +++ b/pkg/registry/core/service/portallocator/BUILD @@ -47,6 +47,7 @@ filegroup( srcs = [ ":package-srcs", "//pkg/registry/core/service/portallocator/controller:all-srcs", + "//pkg/registry/core/service/portallocator/storage:all-srcs", ], tags = ["automanaged"], ) diff --git a/pkg/registry/core/service/portallocator/storage/BUILD b/pkg/registry/core/service/portallocator/storage/BUILD new file mode 100644 index 00000000000..98ce69e0658 --- /dev/null +++ b/pkg/registry/core/service/portallocator/storage/BUILD @@ -0,0 +1,45 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = ["storage_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/apis/core:go_default_library", + "//pkg/apis/core/install:go_default_library", + "//pkg/registry/core/service/allocator:go_default_library", + "//pkg/registry/core/service/allocator/storage:go_default_library", + "//pkg/registry/core/service/portallocator:go_default_library", + "//pkg/registry/registrytest:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library", + ], +) + +go_library( + name = "go_default_library", + srcs = ["storage.go"], + importpath = "k8s.io/kubernetes/pkg/registry/core/service/portallocator/storage", +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/registry/core/service/portallocator/storage/storage.go b/pkg/registry/core/service/portallocator/storage/storage.go new file mode 100644 index 00000000000..8b40a8ba012 --- /dev/null +++ b/pkg/registry/core/service/portallocator/storage/storage.go @@ -0,0 +1,19 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +// Keep CI happy; it is unhappy if a directory only contains tests diff --git a/pkg/registry/core/service/portallocator/storage/storage_test.go b/pkg/registry/core/service/portallocator/storage/storage_test.go new file mode 100644 index 00000000000..a86c1ddf4c2 --- /dev/null +++ b/pkg/registry/core/service/portallocator/storage/storage_test.go @@ -0,0 +1,184 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "context" + "fmt" + "strings" + "testing" + + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/storage" + etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + "k8s.io/apiserver/pkg/storage/storagebackend/factory" + api "k8s.io/kubernetes/pkg/apis/core" + _ "k8s.io/kubernetes/pkg/apis/core/install" + "k8s.io/kubernetes/pkg/registry/core/service/allocator" + allocatorstore "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage" + "k8s.io/kubernetes/pkg/registry/core/service/portallocator" + "k8s.io/kubernetes/pkg/registry/registrytest" +) + +const ( + basePortRange = 30000 + sizePortRange = 2768 +) + +func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Interface, allocator.Interface, storage.Interface, factory.DestroyFunc) { + etcdStorage, server := registrytest.NewEtcdStorage(t, "") + + serviceNodePortRange := utilnet.PortRange{Base: basePortRange, Size: sizePortRange} + var backing allocator.Interface + storage, err := portallocator.NewPortAllocatorCustom(serviceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) { + mem := allocator.NewAllocationMap(max, rangeSpec) + backing = mem + etcd, err := allocatorstore.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), etcdStorage) + if err != nil { + return nil, err + } + return etcd, nil + }) + if err != nil { + t.Fatalf("unexpected error creating etcd: %v", err) + } + s, d, err := generic.NewRawStorage(etcdStorage) + if err != nil { + t.Fatalf("Couldn't create storage: %v", err) + } + destroyFunc := func() { + d() + server.Terminate(t) + } + return server, storage, backing, s, destroyFunc +} + +func validNewRangeAllocation() *api.RangeAllocation { + portRange := fmt.Sprintf("%d-%d", basePortRange, basePortRange+sizePortRange-1) + return &api.RangeAllocation{ + Range: portRange, + } +} + +func key() string { + return "/ranges/servicenodeports" +} + +// TestEmpty fails to allocate ports if the storage wasn't initialized with a servicenodeport range +func TestEmpty(t *testing.T) { + _, storage, _, _, destroyFunc := newStorage(t) + defer destroyFunc() + if err := storage.Allocate(31000); !strings.Contains(err.Error(), "cannot allocate resources of type servicenodeportallocations at this time") { + t.Fatal(err) + } +} + +// TestAllocate fails to allocate ports out of the valid port range +func TestAllocate(t *testing.T) { + _, storage, _, si, destroyFunc := newStorage(t) + defer destroyFunc() + if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + tests := []struct { + name string + port int + errMsg string + }{ + { + name: "Allocate base port", + port: basePortRange, + errMsg: "", + }, + { + name: "Allocate maximum from the port range", + port: basePortRange + sizePortRange - 1, + errMsg: "", + }, + { + name: "Allocate invalid port: base port minus 1", + port: basePortRange - 1, + errMsg: fmt.Sprintf("provided port is not in the valid range. The range of valid ports is %d-%d", basePortRange, basePortRange+sizePortRange-1), + }, + { + name: "Allocate invalid port: maximum port from the port range plus 1", + port: basePortRange + sizePortRange, + errMsg: fmt.Sprintf("provided port is not in the valid range. The range of valid ports is %d-%d", basePortRange, basePortRange+sizePortRange-1), + }, + { + name: "Allocate invalid port", + port: -2, + errMsg: fmt.Sprintf("provided port is not in the valid range. The range of valid ports is %d-%d", basePortRange, basePortRange+sizePortRange-1), + }, + } + for _, tt := range tests { + tt := tt // NOTE: https://github.com/golang/go/wiki/CommonMistakes#using-goroutines-on-loop-iterator-variables + t.Run(tt.name, func(t *testing.T) { + err := storage.Allocate(tt.port) + if (err == nil) != (tt.errMsg == "") { + t.Fatalf("Error expected %v, received %v", tt.errMsg, err) + } + if err != nil && err.Error() != tt.errMsg { + t.Fatalf("Error message expected %v, received %v", tt.errMsg, err) + } + }) + } + +} + +// TestReallocate test that we can not allocate a port already allocated until it is released +func TestReallocate(t *testing.T) { + _, storage, backing, si, destroyFunc := newStorage(t) + defer destroyFunc() + if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Allocate a port inside the valid port range + if err := storage.Allocate(30100); err != nil { + t.Fatal(err) + } + + // Try to allocate the same port in the local bitmap + // The local bitmap stores the offset of the port + // offset = port - base (30100 - 30000 = 100) + ok, err := backing.Allocate(100) + if err != nil { + t.Fatal(err) + } + // It should not allocate the port because it was already allocated + if ok { + t.Fatal("Expected allocation to fail") + } + // Try to allocate the port again should fail + if err := storage.Allocate(30100); err != portallocator.ErrAllocated { + t.Fatal(err) + } + + // Release the port + if err := storage.Release(30100); err != nil { + t.Fatal(err) + } + + // Try to allocate the port again should succeed because we've released it + if err := storage.Allocate(30100); err != nil { + t.Fatal(err) + } + +} From e3df13439a72888cd83ed9896359d75e1787d2fc Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Wed, 8 Apr 2020 15:54:52 +0200 Subject: [PATCH 2/2] fix service allocation concurrency issues The service allocator is used to allocate ip addresses for the Service IP allocator and NodePorts for the Service NodePort allocator. It uses a bitmap backed by etcd to store the allocation and tries to allocate the resources directly from the local memory instead from etcd, that can cause issues in environment with high concurrency. It may happen, in deployments with multiple apiservers, that the resource allocation information is out of sync, this is more sensible with NodePorts, per example: 1. apiserver A create a service with NodePort X 2. apiserver B deletes the service 3. apiserver A creates the service again If the allocation data of apiserver A wasn't refreshed with the deletion of apiserver B, apiserver A fails the allocation because the data is out of sync. The Repair loops solve the problem later, but there are some use cases that require to improve the concurrency in the allocation logic. We can try to not do the Allocation and Release operations locally, and try instead to check if the local data is up to date with etcd, and operate over the most recent version of the data. --- .../core/service/allocator/storage/storage.go | 54 +++++------- .../service/allocator/storage/storage_test.go | 56 +++++++++++++ test/integration/master/BUILD | 1 + .../integration/master/kube_apiserver_test.go | 82 +++++++++++++++++++ 4 files changed, 161 insertions(+), 32 deletions(-) diff --git a/pkg/registry/core/service/allocator/storage/storage.go b/pkg/registry/core/service/allocator/storage/storage.go index be93bbee34d..bd26d025c80 100644 --- a/pkg/registry/core/service/allocator/storage/storage.go +++ b/pkg/registry/core/service/allocator/storage/storage.go @@ -80,17 +80,12 @@ func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.Grou }, nil } -// Allocate attempts to allocate the item locally and then in etcd. +// Allocate attempts to allocate the item. func (e *Etcd) Allocate(offset int) (bool, error) { e.lock.Lock() defer e.lock.Unlock() - ok, err := e.alloc.Allocate(offset) - if !ok || err != nil { - return ok, err - } - - err = e.tryUpdate(func() error { + err := e.tryUpdate(func() error { ok, err := e.alloc.Allocate(offset) if err != nil { return err @@ -109,49 +104,44 @@ func (e *Etcd) Allocate(offset int) (bool, error) { return true, nil } -// AllocateNext attempts to allocate the next item locally and then in etcd. +// AllocateNext attempts to allocate the next item. func (e *Etcd) AllocateNext() (int, bool, error) { e.lock.Lock() defer e.lock.Unlock() - - offset, ok, err := e.alloc.AllocateNext() - if !ok || err != nil { - return offset, ok, err - } + var offset int + var ok bool + var err error err = e.tryUpdate(func() error { - ok, err := e.alloc.Allocate(offset) + // update the offset here + offset, ok, err = e.alloc.AllocateNext() if err != nil { return err } if !ok { - // update the offset here - offset, ok, err = e.alloc.AllocateNext() - if err != nil { - return err - } - if !ok { - return errorUnableToAllocate - } - return nil + return errorUnableToAllocate } return nil }) - return offset, ok, err + + if err != nil { + if err == errorUnableToAllocate { + return offset, false, nil + } + return offset, false, err + } + return offset, true, nil } -// Release attempts to release the provided item locally and then in etcd. +// Release attempts to release the provided item. func (e *Etcd) Release(item int) error { e.lock.Lock() defer e.lock.Unlock() - if err := e.alloc.Release(item); err != nil { - return err - } - return e.tryUpdate(func() error { return e.alloc.Release(item) }) + } func (e *Etcd) ForEach(fn func(int)) { @@ -172,9 +162,9 @@ func (e *Etcd) tryUpdate(fn func() error) error { if err := e.alloc.Restore(existing.Range, existing.Data); err != nil { return nil, err } - if err := fn(); err != nil { - return nil, err - } + } + if err := fn(); err != nil { + return nil, err } e.last = existing.ResourceVersion rangeSpec, data := e.alloc.Snapshot() diff --git a/pkg/registry/core/service/allocator/storage/storage_test.go b/pkg/registry/core/service/allocator/storage/storage_test.go index a4a24f8b299..7c3b0d1f211 100644 --- a/pkg/registry/core/service/allocator/storage/storage_test.go +++ b/pkg/registry/core/service/allocator/storage/storage_test.go @@ -102,3 +102,59 @@ func TestStore(t *testing.T) { t.Fatal(err) } } + +// Test that one item is allocated in storage but is not allocated locally +// When try to allocate it, it should fail despite it's free in the local bitmap +// bot not in the storage +func TestAllocatedStorageButReleasedLocally(t *testing.T) { + storage, server, backing, _ := newStorage(t) + defer server.Terminate(t) + if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Allocate an item in the storage + if _, err := storage.Allocate(2); err != nil { + t.Fatal(err) + } + + // Release the item in the local bitmap + // emulating it's out of sync with the storage + err := backing.Release(2) + if err != nil { + t.Fatal(err) + } + + // It should fail trying to allocate it deespite it's free + // in the local bitmap because it's not in the storage + ok, err := storage.Allocate(2) + if ok || err != nil { + t.Fatal(err) + } + +} + +// Test that one item is free in storage but is allocated locally +// When try to allocate it, it should succeed despite it's allocated +// in the local bitmap bot not in the storage +func TestAllocatedLocallyButReleasedStorage(t *testing.T) { + storage, server, backing, _ := newStorage(t) + defer server.Terminate(t) + if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Allocate an item in the local bitmap only but not in the storage + // emulating it's out of sync with the storage + if _, err := backing.Allocate(2); err != nil { + t.Fatal(err) + } + + // It should be able to allocate it + // because it's free in the storage + ok, err := storage.Allocate(2) + if !ok || err != nil { + t.Fatal(err) + } + +} diff --git a/test/integration/master/BUILD b/test/integration/master/BUILD index a646170f36d..a30bda253bd 100644 --- a/test/integration/master/BUILD +++ b/test/integration/master/BUILD @@ -40,6 +40,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library", diff --git a/test/integration/master/kube_apiserver_test.go b/test/integration/master/kube_apiserver_test.go index 27f6be71e08..83c0aee4fe2 100644 --- a/test/integration/master/kube_apiserver_test.go +++ b/test/integration/master/kube_apiserver_test.go @@ -35,6 +35,7 @@ import ( apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/client-go/kubernetes" @@ -494,3 +495,84 @@ func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) { func TestReconcilerMasterLeaseMultiCombined(t *testing.T) { testReconcilersMasterLease(t, 3, 3) } + +func TestMultiMasterNodePortAllocation(t *testing.T) { + var kubeAPIServers []*kubeapiservertesting.TestServer + var clientAPIServers []*kubernetes.Clientset + etcd := framework.SharedEtcd() + + instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{ + DisableStorageCleanup: true, + } + + // cleanup the registry storage + defer registry.CleanupStorage() + + // create 2 api servers and 2 clients + for i := 0; i < 2; i++ { + // start master count api server + t.Logf("starting api server: %d", i) + server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{ + "--advertise-address", fmt.Sprintf("10.0.1.%v", i+1), + }, etcd) + kubeAPIServers = append(kubeAPIServers, server) + + // verify kube API servers have registered and create a client + if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) { + client, err := kubernetes.NewForConfig(kubeAPIServers[i].ClientConfig) + if err != nil { + t.Logf("create client error: %v", err) + return false, nil + } + clientAPIServers = append(clientAPIServers, client) + endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{}) + if err != nil { + t.Logf("error fetching endpoints: %v", err) + return false, nil + } + return verifyEndpointsWithIPs(kubeAPIServers, getEndpointIPs(endpoints)), nil + }); err != nil { + t.Fatalf("did not find only lease endpoints: %v", err) + } + } + + serviceObject := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"foo": "bar"}, + Name: "test-node-port", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "nodeport-test", + Port: 443, + TargetPort: intstr.IntOrString{IntVal: 443}, + NodePort: 32080, + Protocol: "TCP", + }, + }, + Type: "NodePort", + Selector: map[string]string{"foo": "bar"}, + }, + } + + // create and delete the same nodePortservice using different APIservers + // to check that API servers are using the same port allocation bitmap + for i := 0; i < 2; i++ { + // Create the service using the first API server + _, err := clientAPIServers[0].CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), serviceObject, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("unable to create service: %v", err) + } + // Delete the service using the second API server + if err := clientAPIServers[1].CoreV1().Services(metav1.NamespaceDefault).Delete(context.TODO(), serviceObject.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + + // shutdown the api servers + for _, server := range kubeAPIServers { + server.TearDownFn() + } + +}