Merge pull request #89937 from aojea/portAllocator2

portAllocator sync local data before allocate
This commit is contained in:
Kubernetes Prow Robot 2020-06-18 19:03:10 -07:00 committed by GitHub
commit 342bcf55e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 410 additions and 32 deletions

View File

@ -80,17 +80,12 @@ func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.Grou
}, nil
}
// Allocate attempts to allocate the item locally and then in etcd.
// Allocate attempts to allocate the item.
func (e *Etcd) Allocate(offset int) (bool, error) {
e.lock.Lock()
defer e.lock.Unlock()
ok, err := e.alloc.Allocate(offset)
if !ok || err != nil {
return ok, err
}
err = e.tryUpdate(func() error {
err := e.tryUpdate(func() error {
ok, err := e.alloc.Allocate(offset)
if err != nil {
return err
@ -109,49 +104,44 @@ func (e *Etcd) Allocate(offset int) (bool, error) {
return true, nil
}
// AllocateNext attempts to allocate the next item locally and then in etcd.
// AllocateNext attempts to allocate the next item.
func (e *Etcd) AllocateNext() (int, bool, error) {
e.lock.Lock()
defer e.lock.Unlock()
offset, ok, err := e.alloc.AllocateNext()
if !ok || err != nil {
return offset, ok, err
}
var offset int
var ok bool
var err error
err = e.tryUpdate(func() error {
ok, err := e.alloc.Allocate(offset)
// update the offset here
offset, ok, err = e.alloc.AllocateNext()
if err != nil {
return err
}
if !ok {
// update the offset here
offset, ok, err = e.alloc.AllocateNext()
if err != nil {
return err
}
if !ok {
return errorUnableToAllocate
}
return nil
return errorUnableToAllocate
}
return nil
})
return offset, ok, err
if err != nil {
if err == errorUnableToAllocate {
return offset, false, nil
}
return offset, false, err
}
return offset, true, nil
}
// Release attempts to release the provided item locally and then in etcd.
// Release attempts to release the provided item.
func (e *Etcd) Release(item int) error {
e.lock.Lock()
defer e.lock.Unlock()
if err := e.alloc.Release(item); err != nil {
return err
}
return e.tryUpdate(func() error {
return e.alloc.Release(item)
})
}
func (e *Etcd) ForEach(fn func(int)) {
@ -172,9 +162,9 @@ func (e *Etcd) tryUpdate(fn func() error) error {
if err := e.alloc.Restore(existing.Range, existing.Data); err != nil {
return nil, err
}
if err := fn(); err != nil {
return nil, err
}
}
if err := fn(); err != nil {
return nil, err
}
e.last = existing.ResourceVersion
rangeSpec, data := e.alloc.Snapshot()

View File

@ -103,3 +103,59 @@ func TestStore(t *testing.T) {
t.Fatal(err)
}
}
// Test that one item is allocated in storage but is not allocated locally
// When try to allocate it, it should fail despite it's free in the local bitmap
// bot not in the storage
func TestAllocatedStorageButReleasedLocally(t *testing.T) {
storage, server, backing, _ := newStorage(t)
defer server.Terminate(t)
if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Allocate an item in the storage
if _, err := storage.Allocate(2); err != nil {
t.Fatal(err)
}
// Release the item in the local bitmap
// emulating it's out of sync with the storage
err := backing.Release(2)
if err != nil {
t.Fatal(err)
}
// It should fail trying to allocate it deespite it's free
// in the local bitmap because it's not in the storage
ok, err := storage.Allocate(2)
if ok || err != nil {
t.Fatal(err)
}
}
// Test that one item is free in storage but is allocated locally
// When try to allocate it, it should succeed despite it's allocated
// in the local bitmap bot not in the storage
func TestAllocatedLocallyButReleasedStorage(t *testing.T) {
storage, server, backing, _ := newStorage(t)
defer server.Terminate(t)
if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Allocate an item in the local bitmap only but not in the storage
// emulating it's out of sync with the storage
if _, err := backing.Allocate(2); err != nil {
t.Fatal(err)
}
// It should be able to allocate it
// because it's free in the storage
ok, err := storage.Allocate(2)
if !ok || err != nil {
t.Fatal(err)
}
}

View File

@ -47,6 +47,7 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/registry/core/service/portallocator/controller:all-srcs",
"//pkg/registry/core/service/portallocator/storage:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,45 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["storage_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/install:go_default_library",
"//pkg/registry/core/service/allocator:go_default_library",
"//pkg/registry/core/service/allocator/storage:go_default_library",
"//pkg/registry/core/service/portallocator:go_default_library",
"//pkg/registry/registrytest:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = ["storage.go"],
importpath = "k8s.io/kubernetes/pkg/registry/core/service/portallocator/storage",
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,19 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
// Keep CI happy; it is unhappy if a directory only contains tests

View File

@ -0,0 +1,184 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"context"
"fmt"
"strings"
"testing"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/storage"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
api "k8s.io/kubernetes/pkg/apis/core"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
allocatorstore "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
"k8s.io/kubernetes/pkg/registry/registrytest"
)
const (
basePortRange = 30000
sizePortRange = 2768
)
func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Interface, allocator.Interface, storage.Interface, factory.DestroyFunc) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
serviceNodePortRange := utilnet.PortRange{Base: basePortRange, Size: sizePortRange}
var backing allocator.Interface
storage, err := portallocator.NewPortAllocatorCustom(serviceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) {
mem := allocator.NewAllocationMap(max, rangeSpec)
backing = mem
etcd, err := allocatorstore.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), etcdStorage)
if err != nil {
return nil, err
}
return etcd, nil
})
if err != nil {
t.Fatalf("unexpected error creating etcd: %v", err)
}
s, d, err := generic.NewRawStorage(etcdStorage)
if err != nil {
t.Fatalf("Couldn't create storage: %v", err)
}
destroyFunc := func() {
d()
server.Terminate(t)
}
return server, storage, backing, s, destroyFunc
}
func validNewRangeAllocation() *api.RangeAllocation {
portRange := fmt.Sprintf("%d-%d", basePortRange, basePortRange+sizePortRange-1)
return &api.RangeAllocation{
Range: portRange,
}
}
func key() string {
return "/ranges/servicenodeports"
}
// TestEmpty fails to allocate ports if the storage wasn't initialized with a servicenodeport range
func TestEmpty(t *testing.T) {
_, storage, _, _, destroyFunc := newStorage(t)
defer destroyFunc()
if err := storage.Allocate(31000); !strings.Contains(err.Error(), "cannot allocate resources of type servicenodeportallocations at this time") {
t.Fatal(err)
}
}
// TestAllocate fails to allocate ports out of the valid port range
func TestAllocate(t *testing.T) {
_, storage, _, si, destroyFunc := newStorage(t)
defer destroyFunc()
if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
tests := []struct {
name string
port int
errMsg string
}{
{
name: "Allocate base port",
port: basePortRange,
errMsg: "",
},
{
name: "Allocate maximum from the port range",
port: basePortRange + sizePortRange - 1,
errMsg: "",
},
{
name: "Allocate invalid port: base port minus 1",
port: basePortRange - 1,
errMsg: fmt.Sprintf("provided port is not in the valid range. The range of valid ports is %d-%d", basePortRange, basePortRange+sizePortRange-1),
},
{
name: "Allocate invalid port: maximum port from the port range plus 1",
port: basePortRange + sizePortRange,
errMsg: fmt.Sprintf("provided port is not in the valid range. The range of valid ports is %d-%d", basePortRange, basePortRange+sizePortRange-1),
},
{
name: "Allocate invalid port",
port: -2,
errMsg: fmt.Sprintf("provided port is not in the valid range. The range of valid ports is %d-%d", basePortRange, basePortRange+sizePortRange-1),
},
}
for _, tt := range tests {
tt := tt // NOTE: https://github.com/golang/go/wiki/CommonMistakes#using-goroutines-on-loop-iterator-variables
t.Run(tt.name, func(t *testing.T) {
err := storage.Allocate(tt.port)
if (err == nil) != (tt.errMsg == "") {
t.Fatalf("Error expected %v, received %v", tt.errMsg, err)
}
if err != nil && err.Error() != tt.errMsg {
t.Fatalf("Error message expected %v, received %v", tt.errMsg, err)
}
})
}
}
// TestReallocate test that we can not allocate a port already allocated until it is released
func TestReallocate(t *testing.T) {
_, storage, backing, si, destroyFunc := newStorage(t)
defer destroyFunc()
if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Allocate a port inside the valid port range
if err := storage.Allocate(30100); err != nil {
t.Fatal(err)
}
// Try to allocate the same port in the local bitmap
// The local bitmap stores the offset of the port
// offset = port - base (30100 - 30000 = 100)
ok, err := backing.Allocate(100)
if err != nil {
t.Fatal(err)
}
// It should not allocate the port because it was already allocated
if ok {
t.Fatal("Expected allocation to fail")
}
// Try to allocate the port again should fail
if err := storage.Allocate(30100); err != portallocator.ErrAllocated {
t.Fatal(err)
}
// Release the port
if err := storage.Release(30100); err != nil {
t.Fatal(err)
}
// Try to allocate the port again should succeed because we've released it
if err := storage.Allocate(30100); err != nil {
t.Fatal(err)
}
}

View File

@ -37,6 +37,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",

View File

@ -34,6 +34,7 @@ import (
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/client-go/kubernetes"
@ -493,3 +494,84 @@ func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) {
func TestReconcilerMasterLeaseMultiCombined(t *testing.T) {
testReconcilersMasterLease(t, 3, 3)
}
func TestMultiMasterNodePortAllocation(t *testing.T) {
var kubeAPIServers []*kubeapiservertesting.TestServer
var clientAPIServers []*kubernetes.Clientset
etcd := framework.SharedEtcd()
instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{
DisableStorageCleanup: true,
}
// cleanup the registry storage
defer registry.CleanupStorage()
// create 2 api servers and 2 clients
for i := 0; i < 2; i++ {
// start master count api server
t.Logf("starting api server: %d", i)
server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{
"--advertise-address", fmt.Sprintf("10.0.1.%v", i+1),
}, etcd)
kubeAPIServers = append(kubeAPIServers, server)
// verify kube API servers have registered and create a client
if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
client, err := kubernetes.NewForConfig(kubeAPIServers[i].ClientConfig)
if err != nil {
t.Logf("create client error: %v", err)
return false, nil
}
clientAPIServers = append(clientAPIServers, client)
endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil {
t.Logf("error fetching endpoints: %v", err)
return false, nil
}
return verifyEndpointsWithIPs(kubeAPIServers, getEndpointIPs(endpoints)), nil
}); err != nil {
t.Fatalf("did not find only lease endpoints: %v", err)
}
}
serviceObject := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"foo": "bar"},
Name: "test-node-port",
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "nodeport-test",
Port: 443,
TargetPort: intstr.IntOrString{IntVal: 443},
NodePort: 32080,
Protocol: "TCP",
},
},
Type: "NodePort",
Selector: map[string]string{"foo": "bar"},
},
}
// create and delete the same nodePortservice using different APIservers
// to check that API servers are using the same port allocation bitmap
for i := 0; i < 2; i++ {
// Create the service using the first API server
_, err := clientAPIServers[0].CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), serviceObject, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unable to create service: %v", err)
}
// Delete the service using the second API server
if err := clientAPIServers[1].CoreV1().Services(metav1.NamespaceDefault).Delete(context.TODO(), serviceObject.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
// shutdown the api servers
for _, server := range kubeAPIServers {
server.TearDownFn()
}
}