diff --git a/federation/apis/federation/v1beta1/generated.proto b/federation/apis/federation/v1beta1/generated.proto index 21738c5e7eb..7c8481e68e8 100644 --- a/federation/apis/federation/v1beta1/generated.proto +++ b/federation/apis/federation/v1beta1/generated.proto @@ -100,7 +100,7 @@ message ClusterSpec { optional k8s.io.kubernetes.pkg.api.v1.LocalObjectReference secretRef = 2; } -// ClusterStatus is information about the current status of a cluster updated by cluster controller peridocally. +// ClusterStatus is information about the current status of a cluster updated by cluster controller periodically. message ClusterStatus { // Conditions is an array of current cluster conditions. // +optional diff --git a/pkg/registry/core/service/portallocator/allocator.go b/pkg/registry/core/service/portallocator/allocator.go index e088bc91a52..d066fe2d578 100644 --- a/pkg/registry/core/service/portallocator/allocator.go +++ b/pkg/registry/core/service/portallocator/allocator.go @@ -78,11 +78,29 @@ func NewPortAllocator(pr net.PortRange) *PortAllocator { }) } +// NewFromSnapshot allocates a PortAllocator and initializes it from a snapshot. +func NewFromSnapshot(snap *api.RangeAllocation) (*PortAllocator, error) { + pr, err := net.ParsePortRange(snap.Range) + if err != nil { + return nil, err + } + r := NewPortAllocator(*pr) + if err := r.Restore(*pr, snap.Data); err != nil { + return nil, err + } + return r, nil +} + // Free returns the count of port left in the range. func (r *PortAllocator) Free() int { return r.alloc.Free() } +// Used returns the count of ports used in the range. +func (r *PortAllocator) Used() int { + return r.portRange.Size - r.alloc.Free() +} + // Allocate attempts to reserve the provided port. ErrNotInRange or // ErrAllocated will be returned if the port is not valid for this range // or has already been reserved. ErrFull will be returned if there diff --git a/pkg/registry/core/service/portallocator/allocator_test.go b/pkg/registry/core/service/portallocator/allocator_test.go index 82501120427..e5613f1c915 100644 --- a/pkg/registry/core/service/portallocator/allocator_test.go +++ b/pkg/registry/core/service/portallocator/allocator_test.go @@ -35,6 +35,9 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != 201 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 0 { + t.Errorf("unexpected used %d", f) + } found := sets.NewString() count := 0 for r.Free() > 0 { @@ -62,6 +65,9 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != 1 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 200 { + t.Errorf("unexpected used %d", f) + } p, err := r.AllocateNext() if err != nil { t.Fatal(err) @@ -95,12 +101,18 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != 1 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 200 { + t.Errorf("unexpected used %d", f) + } if err := r.Allocate(released); err != nil { t.Fatal(err) } if f := r.Free(); f != 0 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 201 { + t.Errorf("unexpected used %d", f) + } } func TestForEach(t *testing.T) { @@ -193,3 +205,42 @@ func TestSnapshot(t *testing.T) { t.Errorf("counts do not match: %d", other.Free()) } } + +func TestNewFromSnapshot(t *testing.T) { + pr, err := net.ParsePortRange("200-300") + if err != nil { + t.Fatal(err) + } + r := NewPortAllocator(*pr) + allocated := []int{} + for i := 0; i < 50; i++ { + p, err := r.AllocateNext() + if err != nil { + t.Fatal(err) + } + allocated = append(allocated, p) + } + + snapshot := api.RangeAllocation{} + if err = r.Snapshot(&snapshot); err != nil { + t.Fatal(err) + } + + r, err = NewFromSnapshot(&snapshot) + if err != nil { + t.Fatal(err) + } + + if x := r.Free(); x != 51 { + t.Fatalf("expected 51 free ports, got %d", x) + } + if x := r.Used(); x != 50 { + t.Fatalf("expected 50 used port, got %d", x) + } + + for _, p := range allocated { + if !r.Has(p) { + t.Fatalf("expected port to be allocated, but it was not") + } + } +} diff --git a/pkg/registry/core/service/portallocator/controller/BUILD b/pkg/registry/core/service/portallocator/controller/BUILD index dc3e7420827..e64c8a963c5 100644 --- a/pkg/registry/core/service/portallocator/controller/BUILD +++ b/pkg/registry/core/service/portallocator/controller/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -24,3 +25,16 @@ go_library( "//pkg/util/wait:go_default_library", ], ) + +go_test( + name = "go_default_test", + srcs = ["repair_test.go"], + library = "go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/registry/core/service/portallocator:go_default_library", + "//pkg/util/net:go_default_library", + ], +) diff --git a/pkg/registry/core/service/portallocator/controller/repair.go b/pkg/registry/core/service/portallocator/controller/repair.go index ceaa41f1ff8..9bba150c037 100644 --- a/pkg/registry/core/service/portallocator/controller/repair.go +++ b/pkg/registry/core/service/portallocator/controller/repair.go @@ -38,8 +38,13 @@ type Repair struct { serviceClient coreclient.ServicesGetter portRange net.PortRange alloc rangeallocation.RangeRegistry + leaks map[int]int // counter per leaked port } +// How many times we need to detect a leak before we clean up. This is to +// avoid races between allocating a ports and using it. +const numRepairsBeforeLeakCleanup = 3 + // NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster // and generates informational warnings for a cluster that is not in sync. func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair { @@ -48,6 +53,7 @@ func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, serviceClient: serviceClient, portRange: portRange, alloc: alloc, + leaks: map[int]int{}, } } @@ -76,21 +82,28 @@ func (c *Repair) runOnce() error { // If etcd server is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and etcd at the same time. - var latest *api.RangeAllocation - var err error - for i := 0; i < 10; i++ { - if latest, err = c.alloc.Get(); err != nil { - time.Sleep(time.Second) - } else { - break - } - } + var snapshot *api.RangeAllocation + + err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + var err error + snapshot, err = c.alloc.Get() + return err == nil, err + }) if err != nil { - return fmt.Errorf("unable to refresh the port block: %v", err) + return fmt.Errorf("unable to refresh the port allocations: %v", err) + } + // If not yet initialized. + if snapshot.Range == "" { + snapshot.Range = c.portRange.String() + } + // Create an allocator because it is easy to use. + stored, err := portallocator.NewFromSnapshot(snapshot) + if err != nil { + return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err) } // We explicitly send no resource version, since the resource version - // of 'latest' is from a different collection, it's not comparable to + // of 'snapshot' is from a different collection, it's not comparable to // the service collection. The caching layer keeps per-collection RVs, // and this is proper, since in theory the collections could be hosted // in separate etcd (or even non-etcd) instances. @@ -99,7 +112,8 @@ func (c *Repair) runOnce() error { return fmt.Errorf("unable to refresh the port block: %v", err) } - r := portallocator.NewPortAllocator(c.portRange) + rebuilt := portallocator.NewPortAllocator(c.portRange) + // Check every Service's ports, and rebuild the state as we think it should be. for i := range list.Items { svc := &list.Items[i] ports := service.CollectServiceNodePorts(svc) @@ -108,18 +122,27 @@ func (c *Repair) runOnce() error { } for _, port := range ports { - switch err := r.Allocate(port); err { + switch err := rebuilt.Allocate(port); err { case nil: + if stored.Has(port) { + // remove it from the old set, so we can find leaks + stored.Release(port) + } else { + // doesn't seem to be allocated + runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace)) + } + delete(c.leaks, port) // it is used, so it can't be leaked case portallocator.ErrAllocated: // TODO: send event - // port is broken, reallocate - runtime.HandleError(fmt.Errorf("the port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace)) + // port is duplicate, reallocate + runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace)) case err.(*portallocator.ErrNotInRange): // TODO: send event - // port is broken, reallocate + // port is out of range, reallocate runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange)) case portallocator.ErrFull: // TODO: send event + // somehow we are out of ports return fmt.Errorf("the port range %v is full; you must widen the port range in order to create new services", c.portRange) default: return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err) @@ -127,12 +150,33 @@ func (c *Repair) runOnce() error { } } - err = r.Snapshot(latest) - if err != nil { + // Check for ports that are left in the old set. They appear to have been leaked. + stored.ForEach(func(port int) { + count, found := c.leaks[port] + switch { + case !found: + // flag it to be cleaned up after any races (hopefully) are gone + runtime.HandleError(fmt.Errorf("the node port %d may have leaked: flagging for later clean up", port)) + count = numRepairsBeforeLeakCleanup - 1 + fallthrough + case count > 0: + // pretend it is still in use until count expires + c.leaks[port] = count - 1 + if err := rebuilt.Allocate(port); err != nil { + runtime.HandleError(fmt.Errorf("the node port %d may have leaked, but can not be allocated: %v", port, err)) + } + default: + // do not add it to the rebuilt set, which means it will be available for reuse + runtime.HandleError(fmt.Errorf("the node port %d appears to have leaked: cleaning up", port)) + } + }) + + // Blast the rebuilt state into storage. + if err := rebuilt.Snapshot(snapshot); err != nil { return fmt.Errorf("unable to snapshot the updated port allocations: %v", err) } - if err := c.alloc.CreateOrUpdate(latest); err != nil { + if err := c.alloc.CreateOrUpdate(snapshot); err != nil { if errors.IsConflict(err) { return err } diff --git a/pkg/registry/core/service/portallocator/controller/repair_test.go b/pkg/registry/core/service/portallocator/controller/repair_test.go new file mode 100644 index 00000000000..b5108d1e12b --- /dev/null +++ b/pkg/registry/core/service/portallocator/controller/repair_test.go @@ -0,0 +1,191 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "strings" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/registry/core/service/portallocator" + "k8s.io/kubernetes/pkg/util/net" +) + +type mockRangeRegistry struct { + getCalled bool + item *api.RangeAllocation + err error + + updateCalled bool + updated *api.RangeAllocation + updateErr error +} + +func (r *mockRangeRegistry) Get() (*api.RangeAllocation, error) { + r.getCalled = true + return r.item, r.err +} + +func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error { + r.updateCalled = true + r.updated = alloc + return r.updateErr +} + +func TestRepair(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + registry := &mockRangeRegistry{ + item: &api.RangeAllocation{Range: "100-200"}, + } + pr, _ := net.ParsePortRange(registry.item.Range) + r := NewRepair(0, fakeClient.Core(), *pr, registry) + + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + if !registry.updateCalled || registry.updated == nil || registry.updated.Range != pr.String() || registry.updated != registry.item { + t.Errorf("unexpected registry: %#v", registry) + } + + registry = &mockRangeRegistry{ + item: &api.RangeAllocation{Range: "100-200"}, + updateErr: fmt.Errorf("test error"), + } + r = NewRepair(0, fakeClient.Core(), *pr, registry) + if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { + t.Fatal(err) + } +} + +func TestRepairLeak(t *testing.T) { + pr, _ := net.ParsePortRange("100-200") + previous := portallocator.NewPortAllocator(*pr) + previous.Allocate(111) + + var dst api.RangeAllocation + err := previous.Snapshot(&dst) + if err != nil { + t.Fatal(err) + } + + fakeClient := fake.NewSimpleClientset() + registry := &mockRangeRegistry{ + item: &api.RangeAllocation{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "1", + }, + Range: dst.Range, + Data: dst.Data, + }, + } + + r := NewRepair(0, fakeClient.Core(), *pr, registry) + // Run through the "leak detection holdoff" loops. + for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + after, err := portallocator.NewFromSnapshot(registry.updated) + if err != nil { + t.Fatal(err) + } + if !after.Has(111) { + t.Errorf("expected portallocator to still have leaked port") + } + } + // Run one more time to actually remove the leak. + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + after, err := portallocator.NewFromSnapshot(registry.updated) + if err != nil { + t.Fatal(err) + } + if after.Has(111) { + t.Errorf("expected portallocator to not have leaked port") + } +} + +func TestRepairWithExisting(t *testing.T) { + pr, _ := net.ParsePortRange("100-200") + previous := portallocator.NewPortAllocator(*pr) + + var dst api.RangeAllocation + err := previous.Snapshot(&dst) + if err != nil { + t.Fatal(err) + } + + fakeClient := fake.NewSimpleClientset( + &api.Service{ + ObjectMeta: api.ObjectMeta{Namespace: "one", Name: "one"}, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{NodePort: 111}}, + }, + }, + &api.Service{ + ObjectMeta: api.ObjectMeta{Namespace: "two", Name: "two"}, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{NodePort: 122}, {NodePort: 133}}, + }, + }, + &api.Service{ // outside range, will be dropped + ObjectMeta: api.ObjectMeta{Namespace: "three", Name: "three"}, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{NodePort: 201}}, + }, + }, + &api.Service{ // empty, ignored + ObjectMeta: api.ObjectMeta{Namespace: "four", Name: "four"}, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{}}, + }, + }, + &api.Service{ // duplicate, dropped + ObjectMeta: api.ObjectMeta{Namespace: "five", Name: "five"}, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{NodePort: 111}}, + }, + }, + ) + + registry := &mockRangeRegistry{ + item: &api.RangeAllocation{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "1", + }, + Range: dst.Range, + Data: dst.Data, + }, + } + r := NewRepair(0, fakeClient.Core(), *pr, registry) + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + after, err := portallocator.NewFromSnapshot(registry.updated) + if err != nil { + t.Fatal(err) + } + if !after.Has(111) || !after.Has(122) || !after.Has(133) { + t.Errorf("unexpected portallocator state: %#v", after) + } + if free := after.Free(); free != 98 { + t.Errorf("unexpected portallocator state: %d free", free) + } +}