diff --git a/federation/apis/federation/v1beta1/generated.proto b/federation/apis/federation/v1beta1/generated.proto index 21738c5e7eb..7c8481e68e8 100644 --- a/federation/apis/federation/v1beta1/generated.proto +++ b/federation/apis/federation/v1beta1/generated.proto @@ -100,7 +100,7 @@ message ClusterSpec { optional k8s.io.kubernetes.pkg.api.v1.LocalObjectReference secretRef = 2; } -// ClusterStatus is information about the current status of a cluster updated by cluster controller peridocally. +// ClusterStatus is information about the current status of a cluster updated by cluster controller periodically. message ClusterStatus { // Conditions is an array of current cluster conditions. // +optional diff --git a/hack/update-openapi-spec.sh b/hack/update-openapi-spec.sh index 0723f40e4aa..b0016bc29b9 100755 --- a/hack/update-openapi-spec.sh +++ b/hack/update-openapi-spec.sh @@ -65,6 +65,8 @@ kube::log::status "Starting kube-apiserver" --cert-dir="${TMP_DIR}/certs" \ --runtime-config="api/all=true" \ --token-auth-file=$TMP_DIR/tokenauth.csv \ + --logtostderr \ + --v=2 \ --service-cluster-ip-range="10.0.0.0/24" >/tmp/openapi-api-server.log 2>&1 & APISERVER_PID=$! diff --git a/pkg/registry/core/service/allocator/BUILD b/pkg/registry/core/service/allocator/BUILD index 49e201c2967..868f9119310 100644 --- a/pkg/registry/core/service/allocator/BUILD +++ b/pkg/registry/core/service/allocator/BUILD @@ -26,5 +26,5 @@ go_test( ], library = "go_default_library", tags = ["automanaged"], - deps = [], + deps = ["//vendor:k8s.io/client-go/pkg/util/sets"], ) diff --git a/pkg/registry/core/service/allocator/bitmap.go b/pkg/registry/core/service/allocator/bitmap.go index 9394d59fc3c..225e8597407 100644 --- a/pkg/registry/core/service/allocator/bitmap.go +++ b/pkg/registry/core/service/allocator/bitmap.go @@ -124,6 +124,33 @@ func (r *AllocationBitmap) Release(offset int) error { return nil } +const ( + // Find the size of a big.Word in bytes. + notZero = uint64(^big.Word(0)) + wordPower = (notZero>>8)&1 + (notZero>>16)&1 + (notZero>>32)&1 + wordSize = 1 << wordPower +) + +// ForEach calls the provided function for each allocated bit. The +// AllocationBitmap may not be modified while this loop is running. +func (r *AllocationBitmap) ForEach(fn func(int)) { + r.lock.Lock() + defer r.lock.Unlock() + + words := r.allocated.Bits() + for wordIdx, word := range words { + bit := 0 + for word > 0 { + if (word & 1) != 0 { + fn((wordIdx * wordSize * 8) + bit) + word = word &^ 1 + } + bit++ + word = word >> 1 + } + } +} + // Has returns true if the provided item is already allocated and a call // to Allocate(offset) would fail. func (r *AllocationBitmap) Has(offset int) bool { diff --git a/pkg/registry/core/service/allocator/bitmap_test.go b/pkg/registry/core/service/allocator/bitmap_test.go index 14139b2b3ed..e17055a975c 100644 --- a/pkg/registry/core/service/allocator/bitmap_test.go +++ b/pkg/registry/core/service/allocator/bitmap_test.go @@ -18,6 +18,8 @@ package allocator import ( "testing" + + "k8s.io/client-go/pkg/util/sets" ) func TestAllocate(t *testing.T) { @@ -82,6 +84,37 @@ func TestRelease(t *testing.T) { } } +func TestForEach(t *testing.T) { + testCases := []sets.Int{ + sets.NewInt(), + sets.NewInt(0), + sets.NewInt(0, 2, 5, 9), + sets.NewInt(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), + } + + for i, tc := range testCases { + m := NewAllocationMap(10, "test") + for offset := range tc { + if ok, _ := m.Allocate(offset); !ok { + t.Errorf("[%d] error allocate offset %v", i, offset) + } + if !m.Has(offset) { + t.Errorf("[%d] expect offset %v allocated", i, offset) + } + } + calls := sets.NewInt() + m.ForEach(func(i int) { + calls.Insert(i) + }) + if len(calls) != len(tc) { + t.Errorf("[%d] expected %d calls, got %d", i, len(tc), len(calls)) + } + if !calls.Equal(tc) { + t.Errorf("[%d] expected calls to equal testcase: %v vs %v", i, calls.List(), tc.List()) + } + } +} + func TestSnapshotAndRestore(t *testing.T) { offset := 3 m := NewAllocationMap(10, "test") diff --git a/pkg/registry/core/service/allocator/etcd/etcd.go b/pkg/registry/core/service/allocator/etcd/etcd.go index 509b116d696..2400f7fd17e 100644 --- a/pkg/registry/core/service/allocator/etcd/etcd.go +++ b/pkg/registry/core/service/allocator/etcd/etcd.go @@ -144,6 +144,12 @@ func (e *Etcd) Release(item int) error { }) } +func (e *Etcd) ForEach(fn func(int)) { + e.lock.Lock() + defer e.lock.Unlock() + e.alloc.ForEach(fn) +} + // tryUpdate performs a read-update to persist the latest snapshot state of allocation. func (e *Etcd) tryUpdate(fn func() error) error { err := e.storage.GuaranteedUpdate(context.TODO(), e.baseKey, &api.RangeAllocation{}, true, nil, diff --git a/pkg/registry/core/service/allocator/interfaces.go b/pkg/registry/core/service/allocator/interfaces.go index 88231dafc12..0d18feff203 100644 --- a/pkg/registry/core/service/allocator/interfaces.go +++ b/pkg/registry/core/service/allocator/interfaces.go @@ -22,6 +22,7 @@ type Interface interface { Allocate(int) (bool, error) AllocateNext() (int, bool, error) Release(int) error + ForEach(func(int)) // For testing Has(int) bool diff --git a/pkg/registry/core/service/ipallocator/allocator.go b/pkg/registry/core/service/ipallocator/allocator.go index 85e0b5f6797..7adb30101bb 100644 --- a/pkg/registry/core/service/ipallocator/allocator.go +++ b/pkg/registry/core/service/ipallocator/allocator.go @@ -32,6 +32,7 @@ type Interface interface { Allocate(net.IP) error AllocateNext() (net.IP, error) Release(net.IP) error + ForEach(func(net.IP)) } var ( @@ -89,6 +90,19 @@ func NewCIDRRange(cidr *net.IPNet) *Range { }) } +// NewFromSnapshot allocates a Range and initializes it from a snapshot. +func NewFromSnapshot(snap *api.RangeAllocation) (*Range, error) { + _, ipnet, err := net.ParseCIDR(snap.Range) + if err != nil { + return nil, err + } + r := NewCIDRRange(ipnet) + if err := r.Restore(ipnet, snap.Data); err != nil { + return nil, err + } + return r, nil +} + func maximum(a, b int) int { if a > b { return a @@ -101,6 +115,16 @@ func (r *Range) Free() int { return r.alloc.Free() } +// Used returns the count of IP addresses used in the range. +func (r *Range) Used() int { + return r.max - r.alloc.Free() +} + +// CIDR returns the CIDR covered by the range. +func (r *Range) CIDR() net.IPNet { + return *r.net +} + // Allocate attempts to reserve the provided IP. ErrNotInRange or // ErrAllocated will be returned if the IP is not valid for this range // or has already been reserved. ErrFull will be returned if there @@ -146,6 +170,14 @@ func (r *Range) Release(ip net.IP) error { return r.alloc.Release(offset) } +// ForEach calls the provided function for each allocated IP. +func (r *Range) ForEach(fn func(net.IP)) { + r.alloc.ForEach(func(offset int) { + ip, _ := GetIndexedIP(r.net, offset+1) // +1 because Range doesn't store IP 0 + fn(ip) + }) +} + // Has returns true if the provided IP is already allocated and a call // to Allocate(ip) would fail with ErrAllocated. func (r *Range) Has(ip net.IP) bool { diff --git a/pkg/registry/core/service/ipallocator/allocator_test.go b/pkg/registry/core/service/ipallocator/allocator_test.go index d8e5bfb73d2..37f72f79df3 100644 --- a/pkg/registry/core/service/ipallocator/allocator_test.go +++ b/pkg/registry/core/service/ipallocator/allocator_test.go @@ -34,6 +34,9 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != 254 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 0 { + t.Errorf("unexpected used %d", f) + } found := sets.NewString() count := 0 for r.Free() > 0 { @@ -61,6 +64,9 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != 1 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 253 { + t.Errorf("unexpected free %d", f) + } ip, err := r.AllocateNext() if err != nil { t.Fatal(err) @@ -87,12 +93,18 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != 1 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 253 { + t.Errorf("unexpected free %d", f) + } if err := r.Allocate(released); err != nil { t.Fatal(err) } if f := r.Free(); f != 0 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 254 { + t.Errorf("unexpected free %d", f) + } } func TestAllocateTiny(t *testing.T) { @@ -167,6 +179,43 @@ func TestRangeSize(t *testing.T) { } } +func TestForEach(t *testing.T) { + _, cidr, err := net.ParseCIDR("192.168.1.0/24") + if err != nil { + t.Fatal(err) + } + + testCases := []sets.String{ + sets.NewString(), + sets.NewString("192.168.1.1"), + sets.NewString("192.168.1.1", "192.168.1.254"), + sets.NewString("192.168.1.1", "192.168.1.128", "192.168.1.254"), + } + + for i, tc := range testCases { + r := NewCIDRRange(cidr) + for ips := range tc { + ip := net.ParseIP(ips) + if err := r.Allocate(ip); err != nil { + t.Errorf("[%d] error allocating IP %v: %v", i, ip, err) + } + if !r.Has(ip) { + t.Errorf("[%d] expected IP %v allocated", i, ip) + } + } + calls := sets.NewString() + r.ForEach(func(ip net.IP) { + calls.Insert(ip.String()) + }) + if len(calls) != len(tc) { + t.Errorf("[%d] expected %d calls, got %d", i, len(tc), len(calls)) + } + if !calls.Equal(tc) { + t.Errorf("[%d] expected calls to equal testcase: %v vs %v", i, calls.List(), tc.List()) + } + } +} + func TestSnapshot(t *testing.T) { _, cidr, err := net.ParseCIDR("192.168.1.0/24") if err != nil { @@ -219,3 +268,42 @@ func TestSnapshot(t *testing.T) { t.Errorf("counts do not match: %d", other.Free()) } } + +func TestNewFromSnapshot(t *testing.T) { + _, cidr, err := net.ParseCIDR("192.168.0.0/24") + if err != nil { + t.Fatal(err) + } + r := NewCIDRRange(cidr) + allocated := []net.IP{} + for i := 0; i < 128; i++ { + ip, err := r.AllocateNext() + if err != nil { + t.Fatal(err) + } + allocated = append(allocated, ip) + } + + snapshot := api.RangeAllocation{} + if err = r.Snapshot(&snapshot); err != nil { + t.Fatal(err) + } + + r, err = NewFromSnapshot(&snapshot) + if err != nil { + t.Fatal(err) + } + + if x := r.Free(); x != 126 { + t.Fatalf("expected 126 free IPs, got %d", x) + } + if x := r.Used(); x != 128 { + t.Fatalf("expected 128 used IPs, got %d", x) + } + + for _, ip := range allocated { + if !r.Has(ip) { + t.Fatalf("expected IP to be allocated, but it was not") + } + } +} diff --git a/pkg/registry/core/service/ipallocator/controller/repair.go b/pkg/registry/core/service/ipallocator/controller/repair.go index 4360689981a..6095a91c83d 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair.go +++ b/pkg/registry/core/service/ipallocator/controller/repair.go @@ -51,8 +51,13 @@ type Repair struct { serviceClient coreclient.ServicesGetter network *net.IPNet alloc rangeallocation.RangeRegistry + leaks map[string]int // counter per leaked IP } +// How many times we need to detect a leak before we clean up. This is to +// avoid races between allocating an IP and using it. +const numRepairsBeforeLeakCleanup = 3 + // NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster // and generates informational warnings for a cluster that is not in sync. func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair { @@ -61,6 +66,7 @@ func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, serviceClient: serviceClient, network: network, alloc: alloc, + leaks: map[string]int{}, } } @@ -89,18 +95,27 @@ func (c *Repair) runOnce() error { // If etcd server is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and etcd at the same time. - var latest *api.RangeAllocation - var err error - err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { - latest, err = c.alloc.Get() + var snapshot *api.RangeAllocation + err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + var err error + snapshot, err = c.alloc.Get() return err == nil, err }) if err != nil { return fmt.Errorf("unable to refresh the service IP block: %v", err) } + // If not yet initialized. + if snapshot.Range == "" { + snapshot.Range = c.network.String() + } + // Create an allocator because it is easy to use. + stored, err := ipallocator.NewFromSnapshot(snapshot) + if err != nil { + return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err) + } // We explicitly send no resource version, since the resource version - // of 'latest' is from a different collection, it's not comparable to + // of 'snapshot' is from a different collection, it's not comparable to // the service collection. The caching layer keeps per-collection RVs, // and this is proper, since in theory the collections could be hosted // in separate etcd (or even non-etcd) instances. @@ -109,40 +124,73 @@ func (c *Repair) runOnce() error { return fmt.Errorf("unable to refresh the service IP block: %v", err) } - r := ipallocator.NewCIDRRange(c.network) + rebuilt := ipallocator.NewCIDRRange(c.network) + // Check every Service's ClusterIP, and rebuild the state as we think it should be. for _, svc := range list.Items { if !api.IsServiceIPSet(&svc) { + // didn't need a cluster IP continue } ip := net.ParseIP(svc.Spec.ClusterIP) if ip == nil { - // cluster IP is broken, reallocate + // cluster IP is corrupt runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace)) continue } - switch err := r.Allocate(ip); err { + // mark it as in-use + switch err := rebuilt.Allocate(ip); err { case nil: + if stored.Has(ip) { + // remove it from the old set, so we can find leaks + stored.Release(ip) + } else { + // cluster IP doesn't seem to be allocated + runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not allocated; repairing", svc.Spec.ClusterIP, svc.Name, svc.Namespace)) + } + delete(c.leaks, ip.String()) // it is used, so it can't be leaked case ipallocator.ErrAllocated: // TODO: send event - // cluster IP is broken, reallocate + // cluster IP is duplicate runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace)) case ipallocator.ErrNotInRange: // TODO: send event - // cluster IP is broken, reallocate + // cluster IP is out of range runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network)) case ipallocator.ErrFull: // TODO: send event - return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", r) + // somehow we are out of IPs + return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", rebuilt) default: return fmt.Errorf("unable to allocate cluster IP %s for service %s/%s due to an unknown error, exiting: %v", ip, svc.Name, svc.Namespace, err) } } - if err := r.Snapshot(latest); err != nil { + // Check for IPs that are left in the old set. They appear to have been leaked. + stored.ForEach(func(ip net.IP) { + count, found := c.leaks[ip.String()] + switch { + case !found: + // flag it to be cleaned up after any races (hopefully) are gone + runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked: flagging for later clean up", ip)) + count = numRepairsBeforeLeakCleanup - 1 + fallthrough + case count > 0: + // pretend it is still in use until count expires + c.leaks[ip.String()] = count - 1 + if err := rebuilt.Allocate(ip); err != nil { + runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked, but can not be allocated: %v", ip, err)) + } + default: + // do not add it to the rebuilt set, which means it will be available for reuse + runtime.HandleError(fmt.Errorf("the cluster IP %s appears to have leaked: cleaning up", ip)) + } + }) + + // Blast the rebuilt state into storage. + if err := rebuilt.Snapshot(snapshot); err != nil { return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err) } - - if err := c.alloc.CreateOrUpdate(latest); err != nil { + if err := c.alloc.CreateOrUpdate(snapshot); err != nil { if errors.IsConflict(err) { return err } diff --git a/pkg/registry/core/service/ipallocator/controller/repair_test.go b/pkg/registry/core/service/ipallocator/controller/repair_test.go index 9306d89c178..7aa3670de2a 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair_test.go +++ b/pkg/registry/core/service/ipallocator/controller/repair_test.go @@ -50,10 +50,10 @@ func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error { func TestRepair(t *testing.T) { fakeClient := fake.NewSimpleClientset() - _, cidr, _ := net.ParseCIDR("192.168.1.0/24") ipregistry := &mockRangeRegistry{ - item: &api.RangeAllocation{}, + item: &api.RangeAllocation{Range: "192.168.1.0/24"}, } + _, cidr, _ := net.ParseCIDR(ipregistry.item.Range) r := NewRepair(0, fakeClient.Core(), cidr, ipregistry) if err := r.RunOnce(); err != nil { @@ -64,7 +64,7 @@ func TestRepair(t *testing.T) { } ipregistry = &mockRangeRegistry{ - item: &api.RangeAllocation{}, + item: &api.RangeAllocation{Range: "192.168.1.0/24"}, updateErr: fmt.Errorf("test error"), } r = NewRepair(0, fakeClient.Core(), cidr, ipregistry) @@ -73,7 +73,7 @@ func TestRepair(t *testing.T) { } } -func TestRepairEmpty(t *testing.T) { +func TestRepairLeak(t *testing.T) { _, cidr, _ := net.ParseCIDR("192.168.1.0/24") previous := ipallocator.NewCIDRRange(cidr) previous.Allocate(net.ParseIP("192.168.1.10")) @@ -94,16 +94,31 @@ func TestRepairEmpty(t *testing.T) { Data: dst.Data, }, } + r := NewRepair(0, fakeClient.Core(), cidr, ipregistry) + // Run through the "leak detection holdoff" loops. + for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + after, err := ipallocator.NewFromSnapshot(ipregistry.updated) + if err != nil { + t.Fatal(err) + } + if !after.Has(net.ParseIP("192.168.1.10")) { + t.Errorf("expected ipallocator to still have leaked IP") + } + } + // Run one more time to actually remove the leak. if err := r.RunOnce(); err != nil { t.Fatal(err) } - after := ipallocator.NewCIDRRange(cidr) - if err := after.Restore(cidr, ipregistry.updated.Data); err != nil { + after, err := ipallocator.NewFromSnapshot(ipregistry.updated) + if err != nil { t.Fatal(err) } if after.Has(net.ParseIP("192.168.1.10")) { - t.Errorf("unexpected ipallocator state: %#v", after) + t.Errorf("expected ipallocator to not have leaked IP") } } @@ -157,14 +172,14 @@ func TestRepairWithExisting(t *testing.T) { if err := r.RunOnce(); err != nil { t.Fatal(err) } - after := ipallocator.NewCIDRRange(cidr) - if err := after.Restore(cidr, ipregistry.updated.Data); err != nil { + after, err := ipallocator.NewFromSnapshot(ipregistry.updated) + if err != nil { t.Fatal(err) } if !after.Has(net.ParseIP("192.168.1.1")) || !after.Has(net.ParseIP("192.168.1.100")) { t.Errorf("unexpected ipallocator state: %#v", after) } - if after.Free() != 252 { - t.Errorf("unexpected ipallocator state: %#v", after) + if free := after.Free(); free != 252 { + t.Errorf("unexpected ipallocator state: %d free", free) } } diff --git a/pkg/registry/core/service/portallocator/allocator.go b/pkg/registry/core/service/portallocator/allocator.go index 8e3d42be08c..d066fe2d578 100644 --- a/pkg/registry/core/service/portallocator/allocator.go +++ b/pkg/registry/core/service/portallocator/allocator.go @@ -33,6 +33,7 @@ type Interface interface { Allocate(int) error AllocateNext() (int, error) Release(int) error + ForEach(func(int)) } var ( @@ -77,11 +78,29 @@ func NewPortAllocator(pr net.PortRange) *PortAllocator { }) } +// NewFromSnapshot allocates a PortAllocator and initializes it from a snapshot. +func NewFromSnapshot(snap *api.RangeAllocation) (*PortAllocator, error) { + pr, err := net.ParsePortRange(snap.Range) + if err != nil { + return nil, err + } + r := NewPortAllocator(*pr) + if err := r.Restore(*pr, snap.Data); err != nil { + return nil, err + } + return r, nil +} + // Free returns the count of port left in the range. func (r *PortAllocator) Free() int { return r.alloc.Free() } +// Used returns the count of ports used in the range. +func (r *PortAllocator) Used() int { + return r.portRange.Size - r.alloc.Free() +} + // Allocate attempts to reserve the provided port. ErrNotInRange or // ErrAllocated will be returned if the port is not valid for this range // or has already been reserved. ErrFull will be returned if there @@ -117,6 +136,13 @@ func (r *PortAllocator) AllocateNext() (int, error) { return r.portRange.Base + offset, nil } +// ForEach calls the provided function for each allocated port. +func (r *PortAllocator) ForEach(fn func(int)) { + r.alloc.ForEach(func(offset int) { + fn(r.portRange.Base + offset) + }) +} + // Release releases the port back to the pool. Releasing an // unallocated port or a port out of the range is a no-op and // returns no error. diff --git a/pkg/registry/core/service/portallocator/allocator_test.go b/pkg/registry/core/service/portallocator/allocator_test.go index 7386172b5e1..e5613f1c915 100644 --- a/pkg/registry/core/service/portallocator/allocator_test.go +++ b/pkg/registry/core/service/portallocator/allocator_test.go @@ -35,6 +35,9 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != 201 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 0 { + t.Errorf("unexpected used %d", f) + } found := sets.NewString() count := 0 for r.Free() > 0 { @@ -62,6 +65,9 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != 1 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 200 { + t.Errorf("unexpected used %d", f) + } p, err := r.AllocateNext() if err != nil { t.Fatal(err) @@ -95,12 +101,56 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != 1 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 200 { + t.Errorf("unexpected used %d", f) + } if err := r.Allocate(released); err != nil { t.Fatal(err) } if f := r.Free(); f != 0 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 201 { + t.Errorf("unexpected used %d", f) + } +} + +func TestForEach(t *testing.T) { + pr, err := net.ParsePortRange("10000-10200") + if err != nil { + t.Fatal(err) + } + + testCases := []sets.Int{ + sets.NewInt(), + sets.NewInt(10000), + sets.NewInt(10000, 10200), + sets.NewInt(10000, 10099, 10200), + } + + for i, tc := range testCases { + r := NewPortAllocator(*pr) + + for port := range tc { + if err := r.Allocate(port); err != nil { + t.Errorf("[%d] error allocating port %v: %v", i, port, err) + } + if !r.Has(port) { + t.Errorf("[%d] expected port %v allocated", i, port) + } + } + + calls := sets.NewInt() + r.ForEach(func(port int) { + calls.Insert(port) + }) + if len(calls) != len(tc) { + t.Errorf("[%d] expected %d calls, got %d", i, len(tc), len(calls)) + } + if !calls.Equal(tc) { + t.Errorf("[%d] expected calls to equal testcase: %v vs %v", i, calls.List(), tc.List()) + } + } } func TestSnapshot(t *testing.T) { @@ -155,3 +205,42 @@ func TestSnapshot(t *testing.T) { t.Errorf("counts do not match: %d", other.Free()) } } + +func TestNewFromSnapshot(t *testing.T) { + pr, err := net.ParsePortRange("200-300") + if err != nil { + t.Fatal(err) + } + r := NewPortAllocator(*pr) + allocated := []int{} + for i := 0; i < 50; i++ { + p, err := r.AllocateNext() + if err != nil { + t.Fatal(err) + } + allocated = append(allocated, p) + } + + snapshot := api.RangeAllocation{} + if err = r.Snapshot(&snapshot); err != nil { + t.Fatal(err) + } + + r, err = NewFromSnapshot(&snapshot) + if err != nil { + t.Fatal(err) + } + + if x := r.Free(); x != 51 { + t.Fatalf("expected 51 free ports, got %d", x) + } + if x := r.Used(); x != 50 { + t.Fatalf("expected 50 used port, got %d", x) + } + + for _, p := range allocated { + if !r.Has(p) { + t.Fatalf("expected port to be allocated, but it was not") + } + } +} diff --git a/pkg/registry/core/service/portallocator/controller/BUILD b/pkg/registry/core/service/portallocator/controller/BUILD index dc3e7420827..e64c8a963c5 100644 --- a/pkg/registry/core/service/portallocator/controller/BUILD +++ b/pkg/registry/core/service/portallocator/controller/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -24,3 +25,16 @@ go_library( "//pkg/util/wait:go_default_library", ], ) + +go_test( + name = "go_default_test", + srcs = ["repair_test.go"], + library = "go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/registry/core/service/portallocator:go_default_library", + "//pkg/util/net:go_default_library", + ], +) diff --git a/pkg/registry/core/service/portallocator/controller/repair.go b/pkg/registry/core/service/portallocator/controller/repair.go index ceaa41f1ff8..9bba150c037 100644 --- a/pkg/registry/core/service/portallocator/controller/repair.go +++ b/pkg/registry/core/service/portallocator/controller/repair.go @@ -38,8 +38,13 @@ type Repair struct { serviceClient coreclient.ServicesGetter portRange net.PortRange alloc rangeallocation.RangeRegistry + leaks map[int]int // counter per leaked port } +// How many times we need to detect a leak before we clean up. This is to +// avoid races between allocating a ports and using it. +const numRepairsBeforeLeakCleanup = 3 + // NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster // and generates informational warnings for a cluster that is not in sync. func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair { @@ -48,6 +53,7 @@ func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, serviceClient: serviceClient, portRange: portRange, alloc: alloc, + leaks: map[int]int{}, } } @@ -76,21 +82,28 @@ func (c *Repair) runOnce() error { // If etcd server is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and etcd at the same time. - var latest *api.RangeAllocation - var err error - for i := 0; i < 10; i++ { - if latest, err = c.alloc.Get(); err != nil { - time.Sleep(time.Second) - } else { - break - } - } + var snapshot *api.RangeAllocation + + err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + var err error + snapshot, err = c.alloc.Get() + return err == nil, err + }) if err != nil { - return fmt.Errorf("unable to refresh the port block: %v", err) + return fmt.Errorf("unable to refresh the port allocations: %v", err) + } + // If not yet initialized. + if snapshot.Range == "" { + snapshot.Range = c.portRange.String() + } + // Create an allocator because it is easy to use. + stored, err := portallocator.NewFromSnapshot(snapshot) + if err != nil { + return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err) } // We explicitly send no resource version, since the resource version - // of 'latest' is from a different collection, it's not comparable to + // of 'snapshot' is from a different collection, it's not comparable to // the service collection. The caching layer keeps per-collection RVs, // and this is proper, since in theory the collections could be hosted // in separate etcd (or even non-etcd) instances. @@ -99,7 +112,8 @@ func (c *Repair) runOnce() error { return fmt.Errorf("unable to refresh the port block: %v", err) } - r := portallocator.NewPortAllocator(c.portRange) + rebuilt := portallocator.NewPortAllocator(c.portRange) + // Check every Service's ports, and rebuild the state as we think it should be. for i := range list.Items { svc := &list.Items[i] ports := service.CollectServiceNodePorts(svc) @@ -108,18 +122,27 @@ func (c *Repair) runOnce() error { } for _, port := range ports { - switch err := r.Allocate(port); err { + switch err := rebuilt.Allocate(port); err { case nil: + if stored.Has(port) { + // remove it from the old set, so we can find leaks + stored.Release(port) + } else { + // doesn't seem to be allocated + runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace)) + } + delete(c.leaks, port) // it is used, so it can't be leaked case portallocator.ErrAllocated: // TODO: send event - // port is broken, reallocate - runtime.HandleError(fmt.Errorf("the port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace)) + // port is duplicate, reallocate + runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace)) case err.(*portallocator.ErrNotInRange): // TODO: send event - // port is broken, reallocate + // port is out of range, reallocate runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange)) case portallocator.ErrFull: // TODO: send event + // somehow we are out of ports return fmt.Errorf("the port range %v is full; you must widen the port range in order to create new services", c.portRange) default: return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err) @@ -127,12 +150,33 @@ func (c *Repair) runOnce() error { } } - err = r.Snapshot(latest) - if err != nil { + // Check for ports that are left in the old set. They appear to have been leaked. + stored.ForEach(func(port int) { + count, found := c.leaks[port] + switch { + case !found: + // flag it to be cleaned up after any races (hopefully) are gone + runtime.HandleError(fmt.Errorf("the node port %d may have leaked: flagging for later clean up", port)) + count = numRepairsBeforeLeakCleanup - 1 + fallthrough + case count > 0: + // pretend it is still in use until count expires + c.leaks[port] = count - 1 + if err := rebuilt.Allocate(port); err != nil { + runtime.HandleError(fmt.Errorf("the node port %d may have leaked, but can not be allocated: %v", port, err)) + } + default: + // do not add it to the rebuilt set, which means it will be available for reuse + runtime.HandleError(fmt.Errorf("the node port %d appears to have leaked: cleaning up", port)) + } + }) + + // Blast the rebuilt state into storage. + if err := rebuilt.Snapshot(snapshot); err != nil { return fmt.Errorf("unable to snapshot the updated port allocations: %v", err) } - if err := c.alloc.CreateOrUpdate(latest); err != nil { + if err := c.alloc.CreateOrUpdate(snapshot); err != nil { if errors.IsConflict(err) { return err } diff --git a/pkg/registry/core/service/portallocator/controller/repair_test.go b/pkg/registry/core/service/portallocator/controller/repair_test.go new file mode 100644 index 00000000000..b5108d1e12b --- /dev/null +++ b/pkg/registry/core/service/portallocator/controller/repair_test.go @@ -0,0 +1,191 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "strings" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/registry/core/service/portallocator" + "k8s.io/kubernetes/pkg/util/net" +) + +type mockRangeRegistry struct { + getCalled bool + item *api.RangeAllocation + err error + + updateCalled bool + updated *api.RangeAllocation + updateErr error +} + +func (r *mockRangeRegistry) Get() (*api.RangeAllocation, error) { + r.getCalled = true + return r.item, r.err +} + +func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error { + r.updateCalled = true + r.updated = alloc + return r.updateErr +} + +func TestRepair(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + registry := &mockRangeRegistry{ + item: &api.RangeAllocation{Range: "100-200"}, + } + pr, _ := net.ParsePortRange(registry.item.Range) + r := NewRepair(0, fakeClient.Core(), *pr, registry) + + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + if !registry.updateCalled || registry.updated == nil || registry.updated.Range != pr.String() || registry.updated != registry.item { + t.Errorf("unexpected registry: %#v", registry) + } + + registry = &mockRangeRegistry{ + item: &api.RangeAllocation{Range: "100-200"}, + updateErr: fmt.Errorf("test error"), + } + r = NewRepair(0, fakeClient.Core(), *pr, registry) + if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { + t.Fatal(err) + } +} + +func TestRepairLeak(t *testing.T) { + pr, _ := net.ParsePortRange("100-200") + previous := portallocator.NewPortAllocator(*pr) + previous.Allocate(111) + + var dst api.RangeAllocation + err := previous.Snapshot(&dst) + if err != nil { + t.Fatal(err) + } + + fakeClient := fake.NewSimpleClientset() + registry := &mockRangeRegistry{ + item: &api.RangeAllocation{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "1", + }, + Range: dst.Range, + Data: dst.Data, + }, + } + + r := NewRepair(0, fakeClient.Core(), *pr, registry) + // Run through the "leak detection holdoff" loops. + for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + after, err := portallocator.NewFromSnapshot(registry.updated) + if err != nil { + t.Fatal(err) + } + if !after.Has(111) { + t.Errorf("expected portallocator to still have leaked port") + } + } + // Run one more time to actually remove the leak. + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + after, err := portallocator.NewFromSnapshot(registry.updated) + if err != nil { + t.Fatal(err) + } + if after.Has(111) { + t.Errorf("expected portallocator to not have leaked port") + } +} + +func TestRepairWithExisting(t *testing.T) { + pr, _ := net.ParsePortRange("100-200") + previous := portallocator.NewPortAllocator(*pr) + + var dst api.RangeAllocation + err := previous.Snapshot(&dst) + if err != nil { + t.Fatal(err) + } + + fakeClient := fake.NewSimpleClientset( + &api.Service{ + ObjectMeta: api.ObjectMeta{Namespace: "one", Name: "one"}, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{NodePort: 111}}, + }, + }, + &api.Service{ + ObjectMeta: api.ObjectMeta{Namespace: "two", Name: "two"}, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{NodePort: 122}, {NodePort: 133}}, + }, + }, + &api.Service{ // outside range, will be dropped + ObjectMeta: api.ObjectMeta{Namespace: "three", Name: "three"}, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{NodePort: 201}}, + }, + }, + &api.Service{ // empty, ignored + ObjectMeta: api.ObjectMeta{Namespace: "four", Name: "four"}, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{}}, + }, + }, + &api.Service{ // duplicate, dropped + ObjectMeta: api.ObjectMeta{Namespace: "five", Name: "five"}, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{NodePort: 111}}, + }, + }, + ) + + registry := &mockRangeRegistry{ + item: &api.RangeAllocation{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "1", + }, + Range: dst.Range, + Data: dst.Data, + }, + } + r := NewRepair(0, fakeClient.Core(), *pr, registry) + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + after, err := portallocator.NewFromSnapshot(registry.updated) + if err != nil { + t.Fatal(err) + } + if !after.Has(111) || !after.Has(122) || !after.Has(133) { + t.Errorf("unexpected portallocator state: %#v", after) + } + if free := after.Free(); free != 98 { + t.Errorf("unexpected portallocator state: %d free", free) + } +} diff --git a/pkg/volume/glusterfs/BUILD b/pkg/volume/glusterfs/BUILD index 5a27ef9c256..55fed5f58e9 100644 --- a/pkg/volume/glusterfs/BUILD +++ b/pkg/volume/glusterfs/BUILD @@ -25,7 +25,6 @@ go_library( "//pkg/apis/storage/v1beta1/util:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/labels:go_default_library", - "//pkg/registry/core/service/allocator:go_default_library", "//pkg/types:go_default_library", "//pkg/util/exec:go_default_library", "//pkg/util/mount:go_default_library", diff --git a/pkg/volume/glusterfs/glusterfs_minmax.go b/pkg/volume/glusterfs/glusterfs_minmax.go index 72da43e7a34..d504e3fe083 100644 --- a/pkg/volume/glusterfs/glusterfs_minmax.go +++ b/pkg/volume/glusterfs/glusterfs_minmax.go @@ -25,8 +25,6 @@ package glusterfs import ( "errors" "sync" - - "k8s.io/kubernetes/pkg/registry/core/service/allocator" ) var ( @@ -51,7 +49,11 @@ var _ Rangeable = &MinMaxAllocator{} // Rangeable is an Interface that can adjust its min/max range. // Rangeable should be threadsafe type Rangeable interface { - allocator.Interface + Allocate(int) (bool, error) + AllocateNext() (int, bool, error) + Release(int) error + Has(int) bool + Free() int SetRange(min, max int) error }