Fix race in service nodeport allocator repair loop

This commit is contained in:
Tim Hockin 2016-12-25 21:56:18 -08:00
parent 103b7c01c1
commit 64f5b050a1
6 changed files with 338 additions and 20 deletions

View File

@ -100,7 +100,7 @@ message ClusterSpec {
optional k8s.io.kubernetes.pkg.api.v1.LocalObjectReference secretRef = 2;
}
// ClusterStatus is information about the current status of a cluster updated by cluster controller peridocally.
// ClusterStatus is information about the current status of a cluster updated by cluster controller periodically.
message ClusterStatus {
// Conditions is an array of current cluster conditions.
// +optional

View File

@ -78,11 +78,29 @@ func NewPortAllocator(pr net.PortRange) *PortAllocator {
})
}
// NewFromSnapshot allocates a PortAllocator and initializes it from a snapshot.
func NewFromSnapshot(snap *api.RangeAllocation) (*PortAllocator, error) {
pr, err := net.ParsePortRange(snap.Range)
if err != nil {
return nil, err
}
r := NewPortAllocator(*pr)
if err := r.Restore(*pr, snap.Data); err != nil {
return nil, err
}
return r, nil
}
// Free returns the count of port left in the range.
func (r *PortAllocator) Free() int {
return r.alloc.Free()
}
// Used returns the count of ports used in the range.
func (r *PortAllocator) Used() int {
return r.portRange.Size - r.alloc.Free()
}
// Allocate attempts to reserve the provided port. ErrNotInRange or
// ErrAllocated will be returned if the port is not valid for this range
// or has already been reserved. ErrFull will be returned if there

View File

@ -35,6 +35,9 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != 201 {
t.Errorf("unexpected free %d", f)
}
if f := r.Used(); f != 0 {
t.Errorf("unexpected used %d", f)
}
found := sets.NewString()
count := 0
for r.Free() > 0 {
@ -62,6 +65,9 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != 1 {
t.Errorf("unexpected free %d", f)
}
if f := r.Used(); f != 200 {
t.Errorf("unexpected used %d", f)
}
p, err := r.AllocateNext()
if err != nil {
t.Fatal(err)
@ -95,12 +101,18 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != 1 {
t.Errorf("unexpected free %d", f)
}
if f := r.Used(); f != 200 {
t.Errorf("unexpected used %d", f)
}
if err := r.Allocate(released); err != nil {
t.Fatal(err)
}
if f := r.Free(); f != 0 {
t.Errorf("unexpected free %d", f)
}
if f := r.Used(); f != 201 {
t.Errorf("unexpected used %d", f)
}
}
func TestForEach(t *testing.T) {
@ -193,3 +205,42 @@ func TestSnapshot(t *testing.T) {
t.Errorf("counts do not match: %d", other.Free())
}
}
func TestNewFromSnapshot(t *testing.T) {
pr, err := net.ParsePortRange("200-300")
if err != nil {
t.Fatal(err)
}
r := NewPortAllocator(*pr)
allocated := []int{}
for i := 0; i < 50; i++ {
p, err := r.AllocateNext()
if err != nil {
t.Fatal(err)
}
allocated = append(allocated, p)
}
snapshot := api.RangeAllocation{}
if err = r.Snapshot(&snapshot); err != nil {
t.Fatal(err)
}
r, err = NewFromSnapshot(&snapshot)
if err != nil {
t.Fatal(err)
}
if x := r.Free(); x != 51 {
t.Fatalf("expected 51 free ports, got %d", x)
}
if x := r.Used(); x != 50 {
t.Fatalf("expected 50 used port, got %d", x)
}
for _, p := range allocated {
if !r.Has(p) {
t.Fatalf("expected port to be allocated, but it was not")
}
}
}

View File

@ -5,6 +5,7 @@ licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
@ -24,3 +25,16 @@ go_library(
"//pkg/util/wait:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["repair_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/registry/core/service/portallocator:go_default_library",
"//pkg/util/net:go_default_library",
],
)

View File

@ -38,8 +38,13 @@ type Repair struct {
serviceClient coreclient.ServicesGetter
portRange net.PortRange
alloc rangeallocation.RangeRegistry
leaks map[int]int // counter per leaked port
}
// How many times we need to detect a leak before we clean up. This is to
// avoid races between allocating a ports and using it.
const numRepairsBeforeLeakCleanup = 3
// NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
@ -48,6 +53,7 @@ func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter,
serviceClient: serviceClient,
portRange: portRange,
alloc: alloc,
leaks: map[int]int{},
}
}
@ -76,21 +82,28 @@ func (c *Repair) runOnce() error {
// If etcd server is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and etcd at the same time.
var latest *api.RangeAllocation
var err error
for i := 0; i < 10; i++ {
if latest, err = c.alloc.Get(); err != nil {
time.Sleep(time.Second)
} else {
break
}
}
var snapshot *api.RangeAllocation
err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
var err error
snapshot, err = c.alloc.Get()
return err == nil, err
})
if err != nil {
return fmt.Errorf("unable to refresh the port block: %v", err)
return fmt.Errorf("unable to refresh the port allocations: %v", err)
}
// If not yet initialized.
if snapshot.Range == "" {
snapshot.Range = c.portRange.String()
}
// Create an allocator because it is easy to use.
stored, err := portallocator.NewFromSnapshot(snapshot)
if err != nil {
return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err)
}
// We explicitly send no resource version, since the resource version
// of 'latest' is from a different collection, it's not comparable to
// of 'snapshot' is from a different collection, it's not comparable to
// the service collection. The caching layer keeps per-collection RVs,
// and this is proper, since in theory the collections could be hosted
// in separate etcd (or even non-etcd) instances.
@ -99,7 +112,8 @@ func (c *Repair) runOnce() error {
return fmt.Errorf("unable to refresh the port block: %v", err)
}
r := portallocator.NewPortAllocator(c.portRange)
rebuilt := portallocator.NewPortAllocator(c.portRange)
// Check every Service's ports, and rebuild the state as we think it should be.
for i := range list.Items {
svc := &list.Items[i]
ports := service.CollectServiceNodePorts(svc)
@ -108,18 +122,27 @@ func (c *Repair) runOnce() error {
}
for _, port := range ports {
switch err := r.Allocate(port); err {
switch err := rebuilt.Allocate(port); err {
case nil:
if stored.Has(port) {
// remove it from the old set, so we can find leaks
stored.Release(port)
} else {
// doesn't seem to be allocated
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace))
}
delete(c.leaks, port) // it is used, so it can't be leaked
case portallocator.ErrAllocated:
// TODO: send event
// port is broken, reallocate
runtime.HandleError(fmt.Errorf("the port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace))
// port is duplicate, reallocate
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace))
case err.(*portallocator.ErrNotInRange):
// TODO: send event
// port is broken, reallocate
// port is out of range, reallocate
runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange))
case portallocator.ErrFull:
// TODO: send event
// somehow we are out of ports
return fmt.Errorf("the port range %v is full; you must widen the port range in order to create new services", c.portRange)
default:
return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err)
@ -127,12 +150,33 @@ func (c *Repair) runOnce() error {
}
}
err = r.Snapshot(latest)
if err != nil {
// Check for ports that are left in the old set. They appear to have been leaked.
stored.ForEach(func(port int) {
count, found := c.leaks[port]
switch {
case !found:
// flag it to be cleaned up after any races (hopefully) are gone
runtime.HandleError(fmt.Errorf("the node port %d may have leaked: flagging for later clean up", port))
count = numRepairsBeforeLeakCleanup - 1
fallthrough
case count > 0:
// pretend it is still in use until count expires
c.leaks[port] = count - 1
if err := rebuilt.Allocate(port); err != nil {
runtime.HandleError(fmt.Errorf("the node port %d may have leaked, but can not be allocated: %v", port, err))
}
default:
// do not add it to the rebuilt set, which means it will be available for reuse
runtime.HandleError(fmt.Errorf("the node port %d appears to have leaked: cleaning up", port))
}
})
// Blast the rebuilt state into storage.
if err := rebuilt.Snapshot(snapshot); err != nil {
return fmt.Errorf("unable to snapshot the updated port allocations: %v", err)
}
if err := c.alloc.CreateOrUpdate(latest); err != nil {
if err := c.alloc.CreateOrUpdate(snapshot); err != nil {
if errors.IsConflict(err) {
return err
}

View File

@ -0,0 +1,191 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"strings"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
"k8s.io/kubernetes/pkg/util/net"
)
type mockRangeRegistry struct {
getCalled bool
item *api.RangeAllocation
err error
updateCalled bool
updated *api.RangeAllocation
updateErr error
}
func (r *mockRangeRegistry) Get() (*api.RangeAllocation, error) {
r.getCalled = true
return r.item, r.err
}
func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error {
r.updateCalled = true
r.updated = alloc
return r.updateErr
}
func TestRepair(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
registry := &mockRangeRegistry{
item: &api.RangeAllocation{Range: "100-200"},
}
pr, _ := net.ParsePortRange(registry.item.Range)
r := NewRepair(0, fakeClient.Core(), *pr, registry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
if !registry.updateCalled || registry.updated == nil || registry.updated.Range != pr.String() || registry.updated != registry.item {
t.Errorf("unexpected registry: %#v", registry)
}
registry = &mockRangeRegistry{
item: &api.RangeAllocation{Range: "100-200"},
updateErr: fmt.Errorf("test error"),
}
r = NewRepair(0, fakeClient.Core(), *pr, registry)
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
}
func TestRepairLeak(t *testing.T) {
pr, _ := net.ParsePortRange("100-200")
previous := portallocator.NewPortAllocator(*pr)
previous.Allocate(111)
var dst api.RangeAllocation
err := previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
fakeClient := fake.NewSimpleClientset()
registry := &mockRangeRegistry{
item: &api.RangeAllocation{
ObjectMeta: api.ObjectMeta{
ResourceVersion: "1",
},
Range: dst.Range,
Data: dst.Data,
},
}
r := NewRepair(0, fakeClient.Core(), *pr, registry)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after, err := portallocator.NewFromSnapshot(registry.updated)
if err != nil {
t.Fatal(err)
}
if !after.Has(111) {
t.Errorf("expected portallocator to still have leaked port")
}
}
// Run one more time to actually remove the leak.
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after, err := portallocator.NewFromSnapshot(registry.updated)
if err != nil {
t.Fatal(err)
}
if after.Has(111) {
t.Errorf("expected portallocator to not have leaked port")
}
}
func TestRepairWithExisting(t *testing.T) {
pr, _ := net.ParsePortRange("100-200")
previous := portallocator.NewPortAllocator(*pr)
var dst api.RangeAllocation
err := previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
fakeClient := fake.NewSimpleClientset(
&api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "one", Name: "one"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{NodePort: 111}},
},
},
&api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "two", Name: "two"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{NodePort: 122}, {NodePort: 133}},
},
},
&api.Service{ // outside range, will be dropped
ObjectMeta: api.ObjectMeta{Namespace: "three", Name: "three"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{NodePort: 201}},
},
},
&api.Service{ // empty, ignored
ObjectMeta: api.ObjectMeta{Namespace: "four", Name: "four"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{}},
},
},
&api.Service{ // duplicate, dropped
ObjectMeta: api.ObjectMeta{Namespace: "five", Name: "five"},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{NodePort: 111}},
},
},
)
registry := &mockRangeRegistry{
item: &api.RangeAllocation{
ObjectMeta: api.ObjectMeta{
ResourceVersion: "1",
},
Range: dst.Range,
Data: dst.Data,
},
}
r := NewRepair(0, fakeClient.Core(), *pr, registry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after, err := portallocator.NewFromSnapshot(registry.updated)
if err != nil {
t.Fatal(err)
}
if !after.Has(111) || !after.Has(122) || !after.Has(133) {
t.Errorf("unexpected portallocator state: %#v", after)
}
if free := after.Free(); free != 98 {
t.Errorf("unexpected portallocator state: %d free", free)
}
}