Merge pull request #39206 from thockin/allocator-repair-race

Automatic merge from submit-queue (batch tested with PRs 39250, 39206)

WIP: work around for IP and port Allocator repair race

Fixes #37488

WIP: This does the IP allocator but not the port allocator yet.  Sending for review before I clone the deltas for ports.

Idea: force the repair loop to detect a leak 3 times in a row before actually releasing the IP.  That should allow any distributed races to resolve.  It's a little hacky, but without mutual exclusion or proper transactions, it works.
This commit is contained in:
Kubernetes Submit Queue 2016-12-27 12:24:08 -08:00 committed by GitHub
commit 9c2bd62159
18 changed files with 667 additions and 50 deletions

View File

@ -100,7 +100,7 @@ message ClusterSpec {
optional k8s.io.kubernetes.pkg.api.v1.LocalObjectReference secretRef = 2;
}
// ClusterStatus is information about the current status of a cluster updated by cluster controller peridocally.
// ClusterStatus is information about the current status of a cluster updated by cluster controller periodically.
message ClusterStatus {
// Conditions is an array of current cluster conditions.
// +optional

View File

@ -65,6 +65,8 @@ kube::log::status "Starting kube-apiserver"
--cert-dir="${TMP_DIR}/certs" \
--runtime-config="api/all=true" \
--token-auth-file=$TMP_DIR/tokenauth.csv \
--logtostderr \
--v=2 \
--service-cluster-ip-range="10.0.0.0/24" >/tmp/openapi-api-server.log 2>&1 &
APISERVER_PID=$!

View File

@ -26,5 +26,5 @@ go_test(
],
library = "go_default_library",
tags = ["automanaged"],
deps = [],
deps = ["//vendor:k8s.io/client-go/pkg/util/sets"],
)

View File

@ -124,6 +124,33 @@ func (r *AllocationBitmap) Release(offset int) error {
return nil
}
const (
// Find the size of a big.Word in bytes.
notZero = uint64(^big.Word(0))
wordPower = (notZero>>8)&1 + (notZero>>16)&1 + (notZero>>32)&1
wordSize = 1 << wordPower
)
// ForEach calls the provided function for each allocated bit. The
// AllocationBitmap may not be modified while this loop is running.
func (r *AllocationBitmap) ForEach(fn func(int)) {
r.lock.Lock()
defer r.lock.Unlock()
words := r.allocated.Bits()
for wordIdx, word := range words {
bit := 0
for word > 0 {
if (word & 1) != 0 {
fn((wordIdx * wordSize * 8) + bit)
word = word &^ 1
}
bit++
word = word >> 1
}
}
}
// Has returns true if the provided item is already allocated and a call
// to Allocate(offset) would fail.
func (r *AllocationBitmap) Has(offset int) bool {

View File

@ -18,6 +18,8 @@ package allocator
import (
"testing"
"k8s.io/client-go/pkg/util/sets"
)
func TestAllocate(t *testing.T) {
@ -82,6 +84,37 @@ func TestRelease(t *testing.T) {
}
}
func TestForEach(t *testing.T) {
testCases := []sets.Int{
sets.NewInt(),
sets.NewInt(0),
sets.NewInt(0, 2, 5, 9),
sets.NewInt(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
}
for i, tc := range testCases {
m := NewAllocationMap(10, "test")
for offset := range tc {
if ok, _ := m.Allocate(offset); !ok {
t.Errorf("[%d] error allocate offset %v", i, offset)
}
if !m.Has(offset) {
t.Errorf("[%d] expect offset %v allocated", i, offset)
}
}
calls := sets.NewInt()
m.ForEach(func(i int) {
calls.Insert(i)
})
if len(calls) != len(tc) {
t.Errorf("[%d] expected %d calls, got %d", i, len(tc), len(calls))
}
if !calls.Equal(tc) {
t.Errorf("[%d] expected calls to equal testcase: %v vs %v", i, calls.List(), tc.List())
}
}
}
func TestSnapshotAndRestore(t *testing.T) {
offset := 3
m := NewAllocationMap(10, "test")

View File

@ -144,6 +144,12 @@ func (e *Etcd) Release(item int) error {
})
}
func (e *Etcd) ForEach(fn func(int)) {
e.lock.Lock()
defer e.lock.Unlock()
e.alloc.ForEach(fn)
}
// tryUpdate performs a read-update to persist the latest snapshot state of allocation.
func (e *Etcd) tryUpdate(fn func() error) error {
err := e.storage.GuaranteedUpdate(context.TODO(), e.baseKey, &api.RangeAllocation{}, true, nil,

View File

@ -22,6 +22,7 @@ type Interface interface {
Allocate(int) (bool, error)
AllocateNext() (int, bool, error)
Release(int) error
ForEach(func(int))
// For testing
Has(int) bool

View File

@ -32,6 +32,7 @@ type Interface interface {
Allocate(net.IP) error
AllocateNext() (net.IP, error)
Release(net.IP) error
ForEach(func(net.IP))
}
var (
@ -89,6 +90,19 @@ func NewCIDRRange(cidr *net.IPNet) *Range {
})
}
// NewFromSnapshot allocates a Range and initializes it from a snapshot.
func NewFromSnapshot(snap *api.RangeAllocation) (*Range, error) {
_, ipnet, err := net.ParseCIDR(snap.Range)
if err != nil {
return nil, err
}
r := NewCIDRRange(ipnet)
if err := r.Restore(ipnet, snap.Data); err != nil {
return nil, err
}
return r, nil
}
func maximum(a, b int) int {
if a > b {
return a
@ -101,6 +115,16 @@ func (r *Range) Free() int {
return r.alloc.Free()
}
// Used returns the count of IP addresses used in the range.
func (r *Range) Used() int {
return r.max - r.alloc.Free()
}
// CIDR returns the CIDR covered by the range.
func (r *Range) CIDR() net.IPNet {
return *r.net
}
// Allocate attempts to reserve the provided IP. ErrNotInRange or
// ErrAllocated will be returned if the IP is not valid for this range
// or has already been reserved. ErrFull will be returned if there
@ -146,6 +170,14 @@ func (r *Range) Release(ip net.IP) error {
return r.alloc.Release(offset)
}
// ForEach calls the provided function for each allocated IP.
func (r *Range) ForEach(fn func(net.IP)) {
r.alloc.ForEach(func(offset int) {
ip, _ := GetIndexedIP(r.net, offset+1) // +1 because Range doesn't store IP 0
fn(ip)
})
}
// Has returns true if the provided IP is already allocated and a call
// to Allocate(ip) would fail with ErrAllocated.
func (r *Range) Has(ip net.IP) bool {

View File

@ -34,6 +34,9 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != 254 {
t.Errorf("unexpected free %d", f)
}
if f := r.Used(); f != 0 {
t.Errorf("unexpected used %d", f)
}
found := sets.NewString()
count := 0
for r.Free() > 0 {
@ -61,6 +64,9 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != 1 {
t.Errorf("unexpected free %d", f)
}
if f := r.Used(); f != 253 {
t.Errorf("unexpected free %d", f)
}
ip, err := r.AllocateNext()
if err != nil {
t.Fatal(err)
@ -87,12 +93,18 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != 1 {
t.Errorf("unexpected free %d", f)
}
if f := r.Used(); f != 253 {
t.Errorf("unexpected free %d", f)
}
if err := r.Allocate(released); err != nil {
t.Fatal(err)
}
if f := r.Free(); f != 0 {
t.Errorf("unexpected free %d", f)
}
if f := r.Used(); f != 254 {
t.Errorf("unexpected free %d", f)
}
}
func TestAllocateTiny(t *testing.T) {
@ -167,6 +179,43 @@ func TestRangeSize(t *testing.T) {
}
}
func TestForEach(t *testing.T) {
_, cidr, err := net.ParseCIDR("192.168.1.0/24")
if err != nil {
t.Fatal(err)
}
testCases := []sets.String{
sets.NewString(),
sets.NewString("192.168.1.1"),
sets.NewString("192.168.1.1", "192.168.1.254"),
sets.NewString("192.168.1.1", "192.168.1.128", "192.168.1.254"),
}
for i, tc := range testCases {
r := NewCIDRRange(cidr)
for ips := range tc {
ip := net.ParseIP(ips)
if err := r.Allocate(ip); err != nil {
t.Errorf("[%d] error allocating IP %v: %v", i, ip, err)
}
if !r.Has(ip) {
t.Errorf("[%d] expected IP %v allocated", i, ip)
}
}
calls := sets.NewString()
r.ForEach(func(ip net.IP) {
calls.Insert(ip.String())
})
if len(calls) != len(tc) {
t.Errorf("[%d] expected %d calls, got %d", i, len(tc), len(calls))
}
if !calls.Equal(tc) {
t.Errorf("[%d] expected calls to equal testcase: %v vs %v", i, calls.List(), tc.List())
}
}
}
func TestSnapshot(t *testing.T) {
_, cidr, err := net.ParseCIDR("192.168.1.0/24")
if err != nil {
@ -219,3 +268,42 @@ func TestSnapshot(t *testing.T) {
t.Errorf("counts do not match: %d", other.Free())
}
}
func TestNewFromSnapshot(t *testing.T) {
_, cidr, err := net.ParseCIDR("192.168.0.0/24")
if err != nil {
t.Fatal(err)
}
r := NewCIDRRange(cidr)
allocated := []net.IP{}
for i := 0; i < 128; i++ {
ip, err := r.AllocateNext()
if err != nil {
t.Fatal(err)
}
allocated = append(allocated, ip)
}
snapshot := api.RangeAllocation{}
if err = r.Snapshot(&snapshot); err != nil {
t.Fatal(err)
}
r, err = NewFromSnapshot(&snapshot)
if err != nil {
t.Fatal(err)
}
if x := r.Free(); x != 126 {
t.Fatalf("expected 126 free IPs, got %d", x)
}
if x := r.Used(); x != 128 {
t.Fatalf("expected 128 used IPs, got %d", x)
}
for _, ip := range allocated {
if !r.Has(ip) {
t.Fatalf("expected IP to be allocated, but it was not")
}
}
}

View File

@ -51,8 +51,13 @@ type Repair struct {
serviceClient coreclient.ServicesGetter
network *net.IPNet
alloc rangeallocation.RangeRegistry
leaks map[string]int // counter per leaked IP
}
// How many times we need to detect a leak before we clean up. This is to
// avoid races between allocating an IP and using it.
const numRepairsBeforeLeakCleanup = 3
// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
@ -61,6 +66,7 @@ func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter,
serviceClient: serviceClient,
network: network,
alloc: alloc,
leaks: map[string]int{},
}
}
@ -89,18 +95,27 @@ func (c *Repair) runOnce() error {
// If etcd server is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and etcd at the same time.
var latest *api.RangeAllocation
var err error
err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
latest, err = c.alloc.Get()
var snapshot *api.RangeAllocation
err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
var err error
snapshot, err = c.alloc.Get()
return err == nil, err
})
if err != nil {
return fmt.Errorf("unable to refresh the service IP block: %v", err)
}
// If not yet initialized.
if snapshot.Range == "" {
snapshot.Range = c.network.String()
}
// Create an allocator because it is easy to use.
stored, err := ipallocator.NewFromSnapshot(snapshot)
if err != nil {
return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err)
}
// We explicitly send no resource version, since the resource version
// of 'latest' is from a different collection, it's not comparable to
// of 'snapshot' is from a different collection, it's not comparable to
// the service collection. The caching layer keeps per-collection RVs,
// and this is proper, since in theory the collections could be hosted
// in separate etcd (or even non-etcd) instances.
@ -109,40 +124,73 @@ func (c *Repair) runOnce() error {
return fmt.Errorf("unable to refresh the service IP block: %v", err)
}
r := ipallocator.NewCIDRRange(c.network)
rebuilt := ipallocator.NewCIDRRange(c.network)
// Check every Service's ClusterIP, and rebuild the state as we think it should be.
for _, svc := range list.Items {
if !api.IsServiceIPSet(&svc) {
// didn't need a cluster IP
continue
}
ip := net.ParseIP(svc.Spec.ClusterIP)
if ip == nil {
// cluster IP is broken, reallocate
// cluster IP is corrupt
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace))
continue
}
switch err := r.Allocate(ip); err {
// mark it as in-use
switch err := rebuilt.Allocate(ip); err {
case nil:
if stored.Has(ip) {
// remove it from the old set, so we can find leaks
stored.Release(ip)
} else {
// cluster IP doesn't seem to be allocated
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not allocated; repairing", svc.Spec.ClusterIP, svc.Name, svc.Namespace))
}
delete(c.leaks, ip.String()) // it is used, so it can't be leaked
case ipallocator.ErrAllocated:
// TODO: send event
// cluster IP is broken, reallocate
// cluster IP is duplicate
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace))
case ipallocator.ErrNotInRange:
// TODO: send event
// cluster IP is broken, reallocate
// cluster IP is out of range
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network))
case ipallocator.ErrFull:
// TODO: send event
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", r)
// somehow we are out of IPs
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", rebuilt)
default:
return fmt.Errorf("unable to allocate cluster IP %s for service %s/%s due to an unknown error, exiting: %v", ip, svc.Name, svc.Namespace, err)
}
}
if err := r.Snapshot(latest); err != nil {
// Check for IPs that are left in the old set. They appear to have been leaked.
stored.ForEach(func(ip net.IP) {
count, found := c.leaks[ip.String()]
switch {
case !found:
// flag it to be cleaned up after any races (hopefully) are gone
runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked: flagging for later clean up", ip))
count = numRepairsBeforeLeakCleanup - 1
fallthrough
case count > 0:
// pretend it is still in use until count expires
c.leaks[ip.String()] = count - 1
if err := rebuilt.Allocate(ip); err != nil {
runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked, but can not be allocated: %v", ip, err))
}
default:
// do not add it to the rebuilt set, which means it will be available for reuse
runtime.HandleError(fmt.Errorf("the cluster IP %s appears to have leaked: cleaning up", ip))
}
})
// Blast the rebuilt state into storage.
if err := rebuilt.Snapshot(snapshot); err != nil {
return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err)
}
if err := c.alloc.CreateOrUpdate(latest); err != nil {
if err := c.alloc.CreateOrUpdate(snapshot); err != nil {
if errors.IsConflict(err) {
return err
}

View File

@ -50,10 +50,10 @@ func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error {
func TestRepair(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{},
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
}
_, cidr, _ := net.ParseCIDR(ipregistry.item.Range)
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
if err := r.RunOnce(); err != nil {
@ -64,7 +64,7 @@ func TestRepair(t *testing.T) {
}
ipregistry = &mockRangeRegistry{
item: &api.RangeAllocation{},
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
updateErr: fmt.Errorf("test error"),
}
r = NewRepair(0, fakeClient.Core(), cidr, ipregistry)
@ -73,7 +73,7 @@ func TestRepair(t *testing.T) {
}
}
func TestRepairEmpty(t *testing.T) {
func TestRepairLeak(t *testing.T) {
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
previous := ipallocator.NewCIDRRange(cidr)
previous.Allocate(net.ParseIP("192.168.1.10"))
@ -94,16 +94,31 @@ func TestRepairEmpty(t *testing.T) {
Data: dst.Data,
},
}
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
if err != nil {
t.Fatal(err)
}
if !after.Has(net.ParseIP("192.168.1.10")) {
t.Errorf("expected ipallocator to still have leaked IP")
}
}
// Run one more time to actually remove the leak.
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after := ipallocator.NewCIDRRange(cidr)
if err := after.Restore(cidr, ipregistry.updated.Data); err != nil {
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
if err != nil {
t.Fatal(err)
}
if after.Has(net.ParseIP("192.168.1.10")) {
t.Errorf("unexpected ipallocator state: %#v", after)
t.Errorf("expected ipallocator to not have leaked IP")
}
}
@ -157,14 +172,14 @@ func TestRepairWithExisting(t *testing.T) {
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after := ipallocator.NewCIDRRange(cidr)
if err := after.Restore(cidr, ipregistry.updated.Data); err != nil {
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
if err != nil {
t.Fatal(err)
}
if !after.Has(net.ParseIP("192.168.1.1")) || !after.Has(net.ParseIP("192.168.1.100")) {
t.Errorf("unexpected ipallocator state: %#v", after)
}
if after.Free() != 252 {
t.Errorf("unexpected ipallocator state: %#v", after)
if free := after.Free(); free != 252 {
t.Errorf("unexpected ipallocator state: %d free", free)
}
}

View File

@ -33,6 +33,7 @@ type Interface interface {
Allocate(int) error
AllocateNext() (int, error)
Release(int) error
ForEach(func(int))
}
var (
@ -77,11 +78,29 @@ func NewPortAllocator(pr net.PortRange) *PortAllocator {
})
}
// NewFromSnapshot allocates a PortAllocator and initializes it from a snapshot.
func NewFromSnapshot(snap *api.RangeAllocation) (*PortAllocator, error) {
pr, err := net.ParsePortRange(snap.Range)
if err != nil {
return nil, err
}
r := NewPortAllocator(*pr)
if err := r.Restore(*pr, snap.Data); err != nil {
return nil, err
}
return r, nil
}
// Free returns the count of port left in the range.
func (r *PortAllocator) Free() int {
return r.alloc.Free()
}
// Used returns the count of ports used in the range.
func (r *PortAllocator) Used() int {
return r.portRange.Size - r.alloc.Free()
}
// Allocate attempts to reserve the provided port. ErrNotInRange or
// ErrAllocated will be returned if the port is not valid for this range
// or has already been reserved. ErrFull will be returned if there
@ -117,6 +136,13 @@ func (r *PortAllocator) AllocateNext() (int, error) {
return r.portRange.Base + offset, nil
}
// ForEach calls the provided function for each allocated port.
func (r *PortAllocator) ForEach(fn func(int)) {
r.alloc.ForEach(func(offset int) {
fn(r.portRange.Base + offset)
})
}
// Release releases the port back to the pool. Releasing an
// unallocated port or a port out of the range is a no-op and
// returns no error.

View File

@ -35,6 +35,9 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != 201 {
t.Errorf("unexpected free %d", f)
}
if f := r.Used(); f != 0 {
t.Errorf("unexpected used %d", f)
}
found := sets.NewString()
count := 0
for r.Free() > 0 {
@ -62,6 +65,9 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != 1 {
t.Errorf("unexpected free %d", f)
}
if f := r.Used(); f != 200 {
t.Errorf("unexpected used %d", f)
}
p, err := r.AllocateNext()
if err != nil {
t.Fatal(err)
@ -95,12 +101,56 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != 1 {
t.Errorf("unexpected free %d", f)
}
if f := r.Used(); f != 200 {
t.Errorf("unexpected used %d", f)
}
if err := r.Allocate(released); err != nil {
t.Fatal(err)
}
if f := r.Free(); f != 0 {
t.Errorf("unexpected free %d", f)
}
if f := r.Used(); f != 201 {
t.Errorf("unexpected used %d", f)
}
}
func TestForEach(t *testing.T) {
pr, err := net.ParsePortRange("10000-10200")
if err != nil {
t.Fatal(err)
}
testCases := []sets.Int{
sets.NewInt(),
sets.NewInt(10000),
sets.NewInt(10000, 10200),
sets.NewInt(10000, 10099, 10200),
}
for i, tc := range testCases {
r := NewPortAllocator(*pr)
for port := range tc {
if err := r.Allocate(port); err != nil {
t.Errorf("[%d] error allocating port %v: %v", i, port, err)
}
if !r.Has(port) {
t.Errorf("[%d] expected port %v allocated", i, port)
}
}
calls := sets.NewInt()
r.ForEach(func(port int) {
calls.Insert(port)
})
if len(calls) != len(tc) {
t.Errorf("[%d] expected %d calls, got %d", i, len(tc), len(calls))
}
if !calls.Equal(tc) {
t.Errorf("[%d] expected calls to equal testcase: %v vs %v", i, calls.List(), tc.List())
}
}
}
func TestSnapshot(t *testing.T) {
@ -155,3 +205,42 @@ func TestSnapshot(t *testing.T) {
t.Errorf("counts do not match: %d", other.Free())
}
}
func TestNewFromSnapshot(t *testing.T) {
pr, err := net.ParsePortRange("200-300")
if err != nil {
t.Fatal(err)
}
r := NewPortAllocator(*pr)
allocated := []int{}
for i := 0; i < 50; i++ {
p, err := r.AllocateNext()
if err != nil {
t.Fatal(err)
}
allocated = append(allocated, p)
}
snapshot := api.RangeAllocation{}
if err = r.Snapshot(&snapshot); err != nil {
t.Fatal(err)
}
r, err = NewFromSnapshot(&snapshot)
if err != nil {
t.Fatal(err)
}
if x := r.Free(); x != 51 {
t.Fatalf("expected 51 free ports, got %d", x)
}
if x := r.Used(); x != 50 {
t.Fatalf("expected 50 used port, got %d", x)
}
for _, p := range allocated {
if !r.Has(p) {
t.Fatalf("expected port to be allocated, but it was not")
}
}
}

View File

@ -5,6 +5,7 @@ licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
@ -24,3 +25,16 @@ go_library(
"//pkg/util/wait:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["repair_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/registry/core/service/portallocator:go_default_library",
"//pkg/util/net:go_default_library",
],
)

View File

@ -38,8 +38,13 @@ type Repair struct {
serviceClient coreclient.ServicesGetter
portRange net.PortRange
alloc rangeallocation.RangeRegistry
leaks map[int]int // counter per leaked port
}
// How many times we need to detect a leak before we clean up. This is to
// avoid races between allocating a ports and using it.
const numRepairsBeforeLeakCleanup = 3
// NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
@ -48,6 +53,7 @@ func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter,
serviceClient: serviceClient,
portRange: portRange,
alloc: alloc,
leaks: map[int]int{},
}
}
@ -76,21 +82,28 @@ func (c *Repair) runOnce() error {
// If etcd server is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and etcd at the same time.
var latest *api.RangeAllocation
var err error
for i := 0; i < 10; i++ {
if latest, err = c.alloc.Get(); err != nil {
time.Sleep(time.Second)
} else {
break
}
}
var snapshot *api.RangeAllocation
err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
var err error
snapshot, err = c.alloc.Get()
return err == nil, err
})
if err != nil {
return fmt.Errorf("unable to refresh the port block: %v", err)
return fmt.Errorf("unable to refresh the port allocations: %v", err)
}
// If not yet initialized.
if snapshot.Range == "" {
snapshot.Range = c.portRange.String()
}
// Create an allocator because it is easy to use.
stored, err := portallocator.NewFromSnapshot(snapshot)
if err != nil {
return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err)
}
// We explicitly send no resource version, since the resource version
// of 'latest' is from a different collection, it's not comparable to
// of 'snapshot' is from a different collection, it's not comparable to
// the service collection. The caching layer keeps per-collection RVs,
// and this is proper, since in theory the collections could be hosted
// in separate etcd (or even non-etcd) instances.
@ -99,7 +112,8 @@ func (c *Repair) runOnce() error {
return fmt.Errorf("unable to refresh the port block: %v", err)
}
r := portallocator.NewPortAllocator(c.portRange)
rebuilt := portallocator.NewPortAllocator(c.portRange)
// Check every Service's ports, and rebuild the state as we think it should be.
for i := range list.Items {
svc := &list.Items[i]
ports := service.CollectServiceNodePorts(svc)
@ -108,18 +122,27 @@ func (c *Repair) runOnce() error {
}
for _, port := range ports {
switch err := r.Allocate(port); err {
switch err := rebuilt.Allocate(port); err {
case nil:
if stored.Has(port) {
// remove it from the old set, so we can find leaks
stored.Release(port)
} else {
// doesn't seem to be allocated
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace))
}
delete(c.leaks, port) // it is used, so it can't be leaked
case portallocator.ErrAllocated:
// TODO: send event
// port is broken, reallocate
runtime.HandleError(fmt.Errorf("the port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace))
// port is duplicate, reallocate
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace))
case err.(*portallocator.ErrNotInRange):
// TODO: send event
// port is broken, reallocate
// port is out of range, reallocate
runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange))
case portallocator.ErrFull:
// TODO: send event
// somehow we are out of ports
return fmt.Errorf("the port range %v is full; you must widen the port range in order to create new services", c.portRange)
default:
return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err)
@ -127,12 +150,33 @@ func (c *Repair) runOnce() error {
}
}
err = r.Snapshot(latest)
if err != nil {
// Check for ports that are left in the old set. They appear to have been leaked.
stored.ForEach(func(port int) {
count, found := c.leaks[port]
switch {
case !found:
// flag it to be cleaned up after any races (hopefully) are gone
runtime.HandleError(fmt.Errorf("the node port %d may have leaked: flagging for later clean up", port))
count = numRepairsBeforeLeakCleanup - 1
fallthrough
case count > 0:
// pretend it is still in use until count expires
c.leaks[port] = count - 1
if err := rebuilt.Allocate(port); err != nil {
runtime.HandleError(fmt.Errorf("the node port %d may have leaked, but can not be allocated: %v", port, err))
}
default:
// do not add it to the rebuilt set, which means it will be available for reuse
runtime.HandleError(fmt.Errorf("the node port %d appears to have leaked: cleaning up", port))
}
})
// Blast the rebuilt state into storage.
if err := rebuilt.Snapshot(snapshot); err != nil {
return fmt.Errorf("unable to snapshot the updated port allocations: %v", err)
}
if err := c.alloc.CreateOrUpdate(latest); err != nil {
if err := c.alloc.CreateOrUpdate(snapshot); err != nil {
if errors.IsConflict(err) {
return err
}

View File

@ -0,0 +1,191 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"strings"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
"k8s.io/kubernetes/pkg/util/net"
)
type mockRangeRegistry struct {
getCalled bool
item *api.RangeAllocation
err error
updateCalled bool
updated *api.RangeAllocation
updateErr error
}
func (r *mockRangeRegistry) Get() (*api.RangeAllocation, error) {
r.getCalled = true
return r.item, r.err
}
func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error {
r.updateCalled = true
r.updated = alloc
return r.updateErr
}
func TestRepair(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
registry := &mockRangeRegistry{
item: &api.RangeAllocation{Range: "100-200"},
}
pr, _ := net.ParsePortRange(registry.item.Range)
r := NewRepair(0, fakeClient.Core(), *pr, registry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
if !registry.updateCalled || registry.updated == nil || registry.updated.Range != pr.String() || registry.updated != registry.item {
t.Errorf("unexpected registry: %#v", registry)
}
registry = &mockRangeRegistry{
item: &api.RangeAllocation{Range: "100-200"},
updateErr: fmt.Errorf("test error"),
}
r = NewRepair(0, fakeClient.Core(), *pr, registry)
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
}
func TestRepairLeak(t *testing.T) {
pr, _ := net.ParsePortRange("100-200")
previous := portallocator.NewPortAllocator(*pr)
previous.Allocate(111)
var dst api.RangeAllocation
err := previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
fakeClient := fake.NewSimpleClientset()
registry := &mockRangeRegistry{
item: &api.RangeAllocation{
ObjectMeta: api.ObjectMeta{
ResourceVersion: "1",
},
Range: dst.Range,
Data: dst.Data,
},
}
r := NewRepair(0, fakeClient.Core(), *pr, registry)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after, err := portallocator.NewFromSnapshot(registry.updated)
if err != nil {
t.Fatal(err)
}
if !after.Has(111) {
t.Errorf("expected portallocator to still have leaked port")
}
}
// Run one more time to actually remove the leak.
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after, err := portallocator.NewFromSnapshot(registry.updated)
if err != nil {
t.Fatal(err)
}
if after.Has(111) {
t.Errorf("expected portallocator to not have leaked port")
}
}
func TestRepairWithExisting(t *testing.T) {
pr, _ := net.ParsePortRange("100-200")
previous := portallocator.NewPortAllocator(*pr)
var dst api.RangeAllocation
err := previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
fakeClient := fake.NewSimpleClientset(
&api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "one", Name: "one"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{NodePort: 111}},
},
},
&api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "two", Name: "two"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{NodePort: 122}, {NodePort: 133}},
},
},
&api.Service{ // outside range, will be dropped
ObjectMeta: api.ObjectMeta{Namespace: "three", Name: "three"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{NodePort: 201}},
},
},
&api.Service{ // empty, ignored
ObjectMeta: api.ObjectMeta{Namespace: "four", Name: "four"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{}},
},
},
&api.Service{ // duplicate, dropped
ObjectMeta: api.ObjectMeta{Namespace: "five", Name: "five"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{NodePort: 111}},
},
},
)
registry := &mockRangeRegistry{
item: &api.RangeAllocation{
ObjectMeta: api.ObjectMeta{
ResourceVersion: "1",
},
Range: dst.Range,
Data: dst.Data,
},
}
r := NewRepair(0, fakeClient.Core(), *pr, registry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after, err := portallocator.NewFromSnapshot(registry.updated)
if err != nil {
t.Fatal(err)
}
if !after.Has(111) || !after.Has(122) || !after.Has(133) {
t.Errorf("unexpected portallocator state: %#v", after)
}
if free := after.Free(); free != 98 {
t.Errorf("unexpected portallocator state: %d free", free)
}
}

View File

@ -25,7 +25,6 @@ go_library(
"//pkg/apis/storage/v1beta1/util:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/registry/core/service/allocator:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/mount:go_default_library",

View File

@ -25,8 +25,6 @@ package glusterfs
import (
"errors"
"sync"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
)
var (
@ -51,7 +49,11 @@ var _ Rangeable = &MinMaxAllocator{}
// Rangeable is an Interface that can adjust its min/max range.
// Rangeable should be threadsafe
type Rangeable interface {
allocator.Interface
Allocate(int) (bool, error)
AllocateNext() (int, bool, error)
Release(int) error
Has(int) bool
Free() int
SetRange(min, max int) error
}