Merge pull request #81325 from tedyu/etcd-ret-err

Propagate error from NewEtcd
This commit is contained in:
Kubernetes Prow Robot 2019-08-16 10:26:09 -07:00 committed by GitHub
commit b581f97009
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 161 additions and 57 deletions

View File

@ -201,23 +201,35 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
serviceClusterIPAllocator, err := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator.Interface, error) {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := serviceallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
if err != nil {
return nil, err
}
serviceClusterIPRegistry = etcd
return etcd
return etcd, nil
})
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
}
restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry
var serviceNodePortRegistry rangeallocation.RangeRegistry
serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
serviceNodePortAllocator, err := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
if err != nil {
return nil, err
}
serviceNodePortRegistry = etcd
return etcd
return etcd, nil
})
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err)
}
restStorage.ServiceNodePortAllocator = serviceNodePortRegistry
controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)

View File

@ -39,4 +39,4 @@ type Snapshottable interface {
Restore(string, []byte) error
}
type AllocatorFactory func(max int, rangeSpec string) Interface
type AllocatorFactory func(max int, rangeSpec string) (Interface, error)

View File

@ -60,10 +60,10 @@ var _ rangeallocation.RangeRegistry = &Etcd{}
// NewEtcd returns an allocator that is backed by Etcd and can manage
// persisting the snapshot state of allocation after each allocation is made.
func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.GroupResource, config *storagebackend.Config) *Etcd {
func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.GroupResource, config *storagebackend.Config) (*Etcd, error) {
storage, d, err := generic.NewRawStorage(config)
if err != nil {
panic(err) // TODO: Propagate error up
return nil, err
}
// TODO : Remove RegisterStorageCleanup below when PR
@ -76,7 +76,7 @@ func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.Grou
storage: storage,
baseKey: baseKey,
resource: resource,
}
}, nil
}
// Allocate attempts to allocate the item locally and then in etcd.

View File

@ -31,7 +31,10 @@ import (
func newStorage(t *testing.T) (*Etcd, *etcd3testing.EtcdTestServer, allocator.Interface, *storagebackend.Config) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
mem := allocator.NewAllocationMap(100, "rangeSpecValue")
etcd := NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), etcdStorage)
etcd, err := NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), etcdStorage)
if err != nil {
t.Fatalf("unexpected error creating etcd: %v", err)
}
return etcd, server, mem, etcdStorage
}
@ -91,7 +94,10 @@ func TestStore(t *testing.T) {
}
other = allocator.NewAllocationMap(100, "rangeSpecValue")
otherStorage := NewEtcd(other, "/ranges/serviceips", api.Resource("serviceipallocations"), config)
otherStorage, err := NewEtcd(other, "/ranges/serviceips", api.Resource("serviceipallocations"), config)
if err != nil {
t.Fatalf("unexpected error creating etcd: %v", err)
}
if ok, err := otherStorage.Allocate(2); ok || err != nil {
t.Fatal(err)
}

View File

@ -78,7 +78,7 @@ type Range struct {
}
// NewAllocatorCIDRRange creates a Range over a net.IPNet, calling allocatorFactory to construct the backing store.
func NewAllocatorCIDRRange(cidr *net.IPNet, allocatorFactory allocator.AllocatorFactory) *Range {
func NewAllocatorCIDRRange(cidr *net.IPNet, allocatorFactory allocator.AllocatorFactory) (*Range, error) {
max := RangeSize(cidr)
base := bigForIP(cidr.IP)
rangeSpec := cidr.String()
@ -88,14 +88,15 @@ func NewAllocatorCIDRRange(cidr *net.IPNet, allocatorFactory allocator.Allocator
base: base.Add(base, big.NewInt(1)), // don't use the network base
max: maximum(0, int(max-2)), // don't use the network broadcast,
}
r.alloc = allocatorFactory(r.max, rangeSpec)
return &r
var err error
r.alloc, err = allocatorFactory(r.max, rangeSpec)
return &r, err
}
// Helper that wraps NewAllocatorCIDRRange, for creating a range backed by an in-memory store.
func NewCIDRRange(cidr *net.IPNet) *Range {
return NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) allocator.Interface {
return allocator.NewAllocationMap(max, rangeSpec)
func NewCIDRRange(cidr *net.IPNet) (*Range, error) {
return NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewAllocationMap(max, rangeSpec), nil
})
}
@ -105,7 +106,10 @@ func NewFromSnapshot(snap *api.RangeAllocation) (*Range, error) {
if err != nil {
return nil, err
}
r := NewCIDRRange(ipnet)
r, err := NewCIDRRange(ipnet)
if err != nil {
return nil, err
}
if err := r.Restore(ipnet, snap.Data); err != nil {
return nil, err
}

View File

@ -61,7 +61,10 @@ func TestAllocate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
r := NewCIDRRange(cidr)
r, err := NewCIDRRange(cidr)
if err != nil {
t.Fatal(err)
}
t.Logf("base: %v", r.base.Bytes())
if f := r.Free(); f != tc.free {
t.Errorf("Test %s unexpected free %d", tc.name, f)
@ -148,7 +151,10 @@ func TestAllocateTiny(t *testing.T) {
if err != nil {
t.Fatal(err)
}
r := NewCIDRRange(cidr)
r, err := NewCIDRRange(cidr)
if err != nil {
t.Fatal(err)
}
if f := r.Free(); f != 0 {
t.Errorf("free: %d", f)
}
@ -162,7 +168,10 @@ func TestAllocateSmall(t *testing.T) {
if err != nil {
t.Fatal(err)
}
r := NewCIDRRange(cidr)
r, err := NewCIDRRange(cidr)
if err != nil {
t.Fatal(err)
}
if f := r.Free(); f != 2 {
t.Errorf("free: %d", f)
}
@ -252,7 +261,10 @@ func TestForEach(t *testing.T) {
}
for i, tc := range testCases {
r := NewCIDRRange(cidr)
r, err := NewCIDRRange(cidr)
if err != nil {
t.Fatal(err)
}
for ips := range tc {
ip := net.ParseIP(ips)
if err := r.Allocate(ip); err != nil {
@ -280,7 +292,10 @@ func TestSnapshot(t *testing.T) {
if err != nil {
t.Fatal(err)
}
r := NewCIDRRange(cidr)
r, err := NewCIDRRange(cidr)
if err != nil {
t.Fatal(err)
}
ip := []net.IP{}
for i := 0; i < 10; i++ {
n, err := r.AllocateNext()
@ -309,11 +324,17 @@ func TestSnapshot(t *testing.T) {
if err != nil {
t.Fatal(err)
}
other := NewCIDRRange(otherCidr)
other, err := NewCIDRRange(otherCidr)
if err != nil {
t.Fatal(err)
}
if err := r.Restore(otherCidr, dst.Data); err != ErrMismatchedNetwork {
t.Fatal(err)
}
other = NewCIDRRange(network)
other, err = NewCIDRRange(network)
if err != nil {
t.Fatal(err)
}
if err := other.Restore(network, dst.Data); err != nil {
t.Fatal(err)
}
@ -333,7 +354,10 @@ func TestNewFromSnapshot(t *testing.T) {
if err != nil {
t.Fatal(err)
}
r := NewCIDRRange(cidr)
r, err := NewCIDRRange(cidr)
if err != nil {
t.Fatal(err)
}
allocated := []net.IP{}
for i := 0; i < 128; i++ {
ip, err := r.AllocateNext()

View File

@ -135,7 +135,10 @@ func (c *Repair) runOnce() error {
return fmt.Errorf("unable to refresh the service IP block: %v", err)
}
rebuilt := ipallocator.NewCIDRRange(c.network)
rebuilt, err := ipallocator.NewCIDRRange(c.network)
if err != nil {
return fmt.Errorf("unable to create CIDR range: %v", err)
}
// Check every Service's ClusterIP, and rebuild the state as we think it should be.
for _, svc := range list.Items {
if !helper.IsServiceIPSet(&svc) {

View File

@ -77,11 +77,14 @@ func TestRepair(t *testing.T) {
func TestRepairLeak(t *testing.T) {
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
previous := ipallocator.NewCIDRRange(cidr)
previous, err := ipallocator.NewCIDRRange(cidr)
if err != nil {
t.Fatal(err)
}
previous.Allocate(net.ParseIP("192.168.1.10"))
var dst api.RangeAllocation
err := previous.Snapshot(&dst)
err = previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
@ -126,10 +129,13 @@ func TestRepairLeak(t *testing.T) {
func TestRepairWithExisting(t *testing.T) {
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
previous := ipallocator.NewCIDRRange(cidr)
previous, err := ipallocator.NewCIDRRange(cidr)
if err != nil {
t.Fatal(err)
}
var dst api.RangeAllocation
err := previous.Snapshot(&dst)
err = previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}

View File

@ -41,12 +41,18 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, ipallocator.Interfa
}
var backing allocator.Interface
storage := ipallocator.NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) allocator.Interface {
storage, err := ipallocator.NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
mem := allocator.NewAllocationMap(max, rangeSpec)
backing = mem
etcd := allocatorstore.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), etcdStorage)
return etcd
etcd, err := allocatorstore.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), etcdStorage)
if err != nil {
return nil, err
}
return etcd, nil
})
if err != nil {
t.Fatalf("unexpected error creating etcd: %v", err)
}
s, d, err := generic.NewRawStorage(etcdStorage)
if err != nil {
t.Fatalf("Couldn't create storage: %v", err)

View File

@ -63,21 +63,22 @@ type PortAllocator struct {
var _ Interface = &PortAllocator{}
// NewPortAllocatorCustom creates a PortAllocator over a net.PortRange, calling allocatorFactory to construct the backing store.
func NewPortAllocatorCustom(pr net.PortRange, allocatorFactory allocator.AllocatorFactory) *PortAllocator {
func NewPortAllocatorCustom(pr net.PortRange, allocatorFactory allocator.AllocatorFactory) (*PortAllocator, error) {
max := pr.Size
rangeSpec := pr.String()
a := &PortAllocator{
portRange: pr,
}
a.alloc = allocatorFactory(max, rangeSpec)
return a
var err error
a.alloc, err = allocatorFactory(max, rangeSpec)
return a, err
}
// Helper that wraps NewPortAllocatorCustom, for creating a range backed by an in-memory store.
func NewPortAllocator(pr net.PortRange) *PortAllocator {
return NewPortAllocatorCustom(pr, func(max int, rangeSpec string) allocator.Interface {
return allocator.NewAllocationMap(max, rangeSpec)
func NewPortAllocator(pr net.PortRange) (*PortAllocator, error) {
return NewPortAllocatorCustom(pr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewAllocationMap(max, rangeSpec), nil
})
}
@ -87,7 +88,10 @@ func NewFromSnapshot(snap *api.RangeAllocation) (*PortAllocator, error) {
if err != nil {
return nil, err
}
r := NewPortAllocator(*pr)
r, err := NewPortAllocator(*pr)
if err != nil {
return nil, err
}
if err := r.Restore(*pr, snap.Data); err != nil {
return nil, err
}

View File

@ -31,7 +31,10 @@ func TestAllocate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
r := NewPortAllocator(*pr)
r, err := NewPortAllocator(*pr)
if err != nil {
t.Fatal(err)
}
if f := r.Free(); f != 201 {
t.Errorf("unexpected free %d", f)
}
@ -129,7 +132,10 @@ func TestForEach(t *testing.T) {
}
for i, tc := range testCases {
r := NewPortAllocator(*pr)
r, err := NewPortAllocator(*pr)
if err != nil {
t.Fatal(err)
}
for port := range tc {
if err := r.Allocate(port); err != nil {
@ -158,7 +164,10 @@ func TestSnapshot(t *testing.T) {
if err != nil {
t.Fatal(err)
}
r := NewPortAllocator(*pr)
r, err := NewPortAllocator(*pr)
if err != nil {
t.Fatal(err)
}
ports := []int{}
for i := 0; i < 10; i++ {
port, err := r.AllocateNext()
@ -187,11 +196,17 @@ func TestSnapshot(t *testing.T) {
if err != nil {
t.Fatal(err)
}
other := NewPortAllocator(*otherPr)
other, err := NewPortAllocator(*otherPr)
if err != nil {
t.Fatal(err)
}
if err := r.Restore(*otherPr, dst.Data); err != ErrMismatchedNetwork {
t.Fatal(err)
}
other = NewPortAllocator(*pr2)
other, err = NewPortAllocator(*pr2)
if err != nil {
t.Fatal(err)
}
if err := other.Restore(*pr2, dst.Data); err != nil {
t.Fatal(err)
}
@ -211,7 +226,10 @@ func TestNewFromSnapshot(t *testing.T) {
if err != nil {
t.Fatal(err)
}
r := NewPortAllocator(*pr)
r, err := NewPortAllocator(*pr)
if err != nil {
t.Fatal(err)
}
allocated := []int{}
for i := 0; i < 50; i++ {
p, err := r.AllocateNext()

View File

@ -121,7 +121,10 @@ func (c *Repair) runOnce() error {
return fmt.Errorf("unable to refresh the port block: %v", err)
}
rebuilt := portallocator.NewPortAllocator(c.portRange)
rebuilt, err := portallocator.NewPortAllocator(c.portRange)
if err != nil {
return fmt.Errorf("unable to create port allocator: %v", err)
}
// Check every Service's ports, and rebuild the state as we think it should be.
for i := range list.Items {
svc := &list.Items[i]

View File

@ -77,11 +77,14 @@ func TestRepair(t *testing.T) {
func TestRepairLeak(t *testing.T) {
pr, _ := net.ParsePortRange("100-200")
previous := portallocator.NewPortAllocator(*pr)
previous, err := portallocator.NewPortAllocator(*pr)
if err != nil {
t.Fatal(err)
}
previous.Allocate(111)
var dst api.RangeAllocation
err := previous.Snapshot(&dst)
err = previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
@ -126,10 +129,13 @@ func TestRepairLeak(t *testing.T) {
func TestRepairWithExisting(t *testing.T) {
pr, _ := net.ParsePortRange("100-200")
previous := portallocator.NewPortAllocator(*pr)
previous, err := portallocator.NewPortAllocator(*pr)
if err != nil {
t.Fatal(err)
}
var dst api.RangeAllocation
err := previous.Snapshot(&dst)
err = previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}

View File

@ -31,7 +31,10 @@ func TestDryRunAllocate(t *testing.T) {
// Allocate some ports before calling
previouslyAllocated := []int{10000, 10010, 10020}
r := NewPortAllocator(*pr)
r, err := NewPortAllocator(*pr)
if err != nil {
t.Fatal(err)
}
for _, port := range previouslyAllocated {
_ = r.Allocate(port)
}
@ -74,7 +77,10 @@ func TestDryRunAllocateNext(t *testing.T) {
// Allocate some ports before calling
previouslyAllocated := []int{10000, 10010, 10020}
r := NewPortAllocator(*pr)
r, err := NewPortAllocator(*pr)
if err != nil {
t.Fatal(err)
}
for _, port := range previouslyAllocated {
_ = r.Allocate(port)
}

View File

@ -212,10 +212,16 @@ func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.P
}
}
r := ipallocator.NewCIDRRange(makeIPNet(t))
r, err := ipallocator.NewCIDRRange(makeIPNet(t))
if err != nil {
t.Fatalf("cannot create CIDR Range %v", err)
}
portRange := utilnet.PortRange{Base: 30000, Size: 1000}
portAllocator := portallocator.NewPortAllocator(portRange)
portAllocator, err := portallocator.NewPortAllocator(portRange)
if err != nil {
t.Fatalf("cannot create port allocator %v", err)
}
rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, r, portAllocator, nil)