diff --git a/pkg/registry/core/service/allocator/storage/storage.go b/pkg/registry/core/service/allocator/storage/storage.go index be93bbee34d..bd26d025c80 100644 --- a/pkg/registry/core/service/allocator/storage/storage.go +++ b/pkg/registry/core/service/allocator/storage/storage.go @@ -80,17 +80,12 @@ func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.Grou }, nil } -// Allocate attempts to allocate the item locally and then in etcd. +// Allocate attempts to allocate the item. func (e *Etcd) Allocate(offset int) (bool, error) { e.lock.Lock() defer e.lock.Unlock() - ok, err := e.alloc.Allocate(offset) - if !ok || err != nil { - return ok, err - } - - err = e.tryUpdate(func() error { + err := e.tryUpdate(func() error { ok, err := e.alloc.Allocate(offset) if err != nil { return err @@ -109,49 +104,44 @@ func (e *Etcd) Allocate(offset int) (bool, error) { return true, nil } -// AllocateNext attempts to allocate the next item locally and then in etcd. +// AllocateNext attempts to allocate the next item. func (e *Etcd) AllocateNext() (int, bool, error) { e.lock.Lock() defer e.lock.Unlock() - - offset, ok, err := e.alloc.AllocateNext() - if !ok || err != nil { - return offset, ok, err - } + var offset int + var ok bool + var err error err = e.tryUpdate(func() error { - ok, err := e.alloc.Allocate(offset) + // update the offset here + offset, ok, err = e.alloc.AllocateNext() if err != nil { return err } if !ok { - // update the offset here - offset, ok, err = e.alloc.AllocateNext() - if err != nil { - return err - } - if !ok { - return errorUnableToAllocate - } - return nil + return errorUnableToAllocate } return nil }) - return offset, ok, err + + if err != nil { + if err == errorUnableToAllocate { + return offset, false, nil + } + return offset, false, err + } + return offset, true, nil } -// Release attempts to release the provided item locally and then in etcd. +// Release attempts to release the provided item. func (e *Etcd) Release(item int) error { e.lock.Lock() defer e.lock.Unlock() - if err := e.alloc.Release(item); err != nil { - return err - } - return e.tryUpdate(func() error { return e.alloc.Release(item) }) + } func (e *Etcd) ForEach(fn func(int)) { @@ -172,9 +162,9 @@ func (e *Etcd) tryUpdate(fn func() error) error { if err := e.alloc.Restore(existing.Range, existing.Data); err != nil { return nil, err } - if err := fn(); err != nil { - return nil, err - } + } + if err := fn(); err != nil { + return nil, err } e.last = existing.ResourceVersion rangeSpec, data := e.alloc.Snapshot() diff --git a/pkg/registry/core/service/allocator/storage/storage_test.go b/pkg/registry/core/service/allocator/storage/storage_test.go index a4a24f8b299..7c3b0d1f211 100644 --- a/pkg/registry/core/service/allocator/storage/storage_test.go +++ b/pkg/registry/core/service/allocator/storage/storage_test.go @@ -102,3 +102,59 @@ func TestStore(t *testing.T) { t.Fatal(err) } } + +// Test that one item is allocated in storage but is not allocated locally +// When try to allocate it, it should fail despite it's free in the local bitmap +// bot not in the storage +func TestAllocatedStorageButReleasedLocally(t *testing.T) { + storage, server, backing, _ := newStorage(t) + defer server.Terminate(t) + if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Allocate an item in the storage + if _, err := storage.Allocate(2); err != nil { + t.Fatal(err) + } + + // Release the item in the local bitmap + // emulating it's out of sync with the storage + err := backing.Release(2) + if err != nil { + t.Fatal(err) + } + + // It should fail trying to allocate it deespite it's free + // in the local bitmap because it's not in the storage + ok, err := storage.Allocate(2) + if ok || err != nil { + t.Fatal(err) + } + +} + +// Test that one item is free in storage but is allocated locally +// When try to allocate it, it should succeed despite it's allocated +// in the local bitmap bot not in the storage +func TestAllocatedLocallyButReleasedStorage(t *testing.T) { + storage, server, backing, _ := newStorage(t) + defer server.Terminate(t) + if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Allocate an item in the local bitmap only but not in the storage + // emulating it's out of sync with the storage + if _, err := backing.Allocate(2); err != nil { + t.Fatal(err) + } + + // It should be able to allocate it + // because it's free in the storage + ok, err := storage.Allocate(2) + if !ok || err != nil { + t.Fatal(err) + } + +} diff --git a/test/integration/master/BUILD b/test/integration/master/BUILD index a646170f36d..a30bda253bd 100644 --- a/test/integration/master/BUILD +++ b/test/integration/master/BUILD @@ -40,6 +40,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library", diff --git a/test/integration/master/kube_apiserver_test.go b/test/integration/master/kube_apiserver_test.go index 27f6be71e08..83c0aee4fe2 100644 --- a/test/integration/master/kube_apiserver_test.go +++ b/test/integration/master/kube_apiserver_test.go @@ -35,6 +35,7 @@ import ( apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/client-go/kubernetes" @@ -494,3 +495,84 @@ func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) { func TestReconcilerMasterLeaseMultiCombined(t *testing.T) { testReconcilersMasterLease(t, 3, 3) } + +func TestMultiMasterNodePortAllocation(t *testing.T) { + var kubeAPIServers []*kubeapiservertesting.TestServer + var clientAPIServers []*kubernetes.Clientset + etcd := framework.SharedEtcd() + + instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{ + DisableStorageCleanup: true, + } + + // cleanup the registry storage + defer registry.CleanupStorage() + + // create 2 api servers and 2 clients + for i := 0; i < 2; i++ { + // start master count api server + t.Logf("starting api server: %d", i) + server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{ + "--advertise-address", fmt.Sprintf("10.0.1.%v", i+1), + }, etcd) + kubeAPIServers = append(kubeAPIServers, server) + + // verify kube API servers have registered and create a client + if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) { + client, err := kubernetes.NewForConfig(kubeAPIServers[i].ClientConfig) + if err != nil { + t.Logf("create client error: %v", err) + return false, nil + } + clientAPIServers = append(clientAPIServers, client) + endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{}) + if err != nil { + t.Logf("error fetching endpoints: %v", err) + return false, nil + } + return verifyEndpointsWithIPs(kubeAPIServers, getEndpointIPs(endpoints)), nil + }); err != nil { + t.Fatalf("did not find only lease endpoints: %v", err) + } + } + + serviceObject := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"foo": "bar"}, + Name: "test-node-port", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "nodeport-test", + Port: 443, + TargetPort: intstr.IntOrString{IntVal: 443}, + NodePort: 32080, + Protocol: "TCP", + }, + }, + Type: "NodePort", + Selector: map[string]string{"foo": "bar"}, + }, + } + + // create and delete the same nodePortservice using different APIservers + // to check that API servers are using the same port allocation bitmap + for i := 0; i < 2; i++ { + // Create the service using the first API server + _, err := clientAPIServers[0].CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), serviceObject, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("unable to create service: %v", err) + } + // Delete the service using the second API server + if err := clientAPIServers[1].CoreV1().Services(metav1.NamespaceDefault).Delete(context.TODO(), serviceObject.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + + // shutdown the api servers + for _, server := range kubeAPIServers { + server.TearDownFn() + } + +}