Clean storage shutdown for allocators

This commit is contained in:
Wojciech Tyczyński 2022-05-06 12:01:06 +02:00
parent 7141b6d58d
commit 1b72a0f5a7
7 changed files with 52 additions and 12 deletions

View File

@ -198,6 +198,10 @@ func (r *AllocationBitmap) Restore(rangeSpec string, data []byte) error {
return nil return nil
} }
// Destroy cleans up everything on shutdown.
func (r *AllocationBitmap) Destroy() {
}
// randomScanStrategy chooses a random address from the provided big.Int, and then // randomScanStrategy chooses a random address from the provided big.Int, and then
// scans forward looking for the next available address (it will wrap the range if // scans forward looking for the next available address (it will wrap the range if
// necessary). // necessary).

View File

@ -29,6 +29,11 @@ type Interface interface {
// For testing // For testing
Free() int Free() int
// Destroy shuts down all internal structures.
// Destroy needs to be implemented in thread-safe way and be prepared for being
// called more than once.
Destroy()
} }
// Snapshottable is an Interface that can be snapshotted and restored. Snapshottable // Snapshottable is an Interface that can be snapshotted and restored. Snapshottable

View File

@ -26,7 +26,6 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
storeerr "k8s.io/apiserver/pkg/storage/errors" storeerr "k8s.io/apiserver/pkg/storage/errors"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
@ -53,6 +52,8 @@ type Etcd struct {
baseKey string baseKey string
resource schema.GroupResource resource schema.GroupResource
destroyFn func()
} }
// Etcd implements allocator.Interface and rangeallocation.RangeRegistry // Etcd implements allocator.Interface and rangeallocation.RangeRegistry
@ -67,16 +68,13 @@ func NewEtcd(alloc allocator.Snapshottable, baseKey string, config *storagebacke
return nil, err return nil, err
} }
// TODO : Remove RegisterStorageCleanup below when PR var once sync.Once
// https://github.com/kubernetes/kubernetes/pull/50690
// merges as that shuts down storage properly
registry.RegisterStorageCleanup(d)
return &Etcd{ return &Etcd{
alloc: alloc, alloc: alloc,
storage: storage, storage: storage,
baseKey: baseKey, baseKey: baseKey,
resource: config.GroupResource, resource: config.GroupResource,
destroyFn: func() { once.Do(d) },
}, nil }, nil
} }
@ -220,7 +218,7 @@ func (e *Etcd) CreateOrUpdate(snapshot *api.RangeAllocation) error {
return err return err
} }
// Implements allocator.Interface::Has // Has implements allocator.Interface::Has
func (e *Etcd) Has(item int) bool { func (e *Etcd) Has(item int) bool {
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
@ -228,10 +226,15 @@ func (e *Etcd) Has(item int) bool {
return e.alloc.Has(item) return e.alloc.Has(item)
} }
// Implements allocator.Interface::Free // Free implements allocator.Interface::Free
func (e *Etcd) Free() int { func (e *Etcd) Free() int {
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
return e.alloc.Free() return e.alloc.Free()
} }
// Destroy implement allocator.Interface::Destroy
func (e *Etcd) Destroy() {
e.destroyFn()
}

View File

@ -39,6 +39,7 @@ type Interface interface {
CIDR() net.IPNet CIDR() net.IPNet
IPFamily() api.IPFamily IPFamily() api.IPFamily
Has(ip net.IP) bool Has(ip net.IP) bool
Destroy()
// DryRun offers a way to try operations without persisting them. // DryRun offers a way to try operations without persisting them.
DryRun() Interface DryRun() Interface
@ -358,6 +359,11 @@ func (r *Range) contains(ip net.IP) (bool, int) {
return true, offset return true, offset
} }
// Destroy shuts down internal allocator.
func (r *Range) Destroy() {
r.alloc.Destroy()
}
// calculateIPOffset calculates the integer offset of ip from base such that // calculateIPOffset calculates the integer offset of ip from base such that
// base + offset = ip. It requires ip >= base. // base + offset = ip. It requires ip >= base.
func calculateIPOffset(base *big.Int, ip net.IP) int { func calculateIPOffset(base *big.Int, ip net.IP) int {
@ -427,3 +433,6 @@ func (dry dryRunRange) DryRun() Interface {
func (dry dryRunRange) Has(ip net.IP) bool { func (dry dryRunRange) Has(ip net.IP) bool {
return dry.real.Has(ip) return dry.real.Has(ip)
} }
func (dry dryRunRange) Destroy() {
}

View File

@ -35,6 +35,7 @@ type Interface interface {
Release(int) error Release(int) error
ForEach(func(int)) ForEach(func(int))
Has(int) bool Has(int) bool
Destroy()
} }
var ( var (
@ -207,3 +208,8 @@ func (r *PortAllocator) contains(port int) (bool, int) {
offset := port - r.portRange.Base offset := port - r.portRange.Base
return true, offset return true, offset
} }
// Destroy shuts down internal allocator.
func (r *PortAllocator) Destroy() {
r.alloc.Destroy()
}

View File

@ -899,6 +899,13 @@ func (al *Allocators) releaseClusterIPs(service *api.Service) (released map[api.
return al.releaseIPs(toRelease) return al.releaseIPs(toRelease)
} }
func (al *Allocators) Destroy() {
al.serviceNodePorts.Destroy()
for _, a := range al.serviceIPAllocatorsByFamily {
a.Destroy()
}
}
// This is O(N), but we expect haystack to be small; // This is O(N), but we expect haystack to be small;
// so small that we expect a linear search to be faster // so small that we expect a linear search to be faster
func containsNumber(haystack []int, needle int) bool { func containsNumber(haystack []int, needle int) bool {

View File

@ -157,6 +157,12 @@ func (r *REST) Categories() []string {
return []string{"all"} return []string{"all"}
} }
// Destroy cleans up everything on shutdown.
func (r *REST) Destroy() {
r.Store.Destroy()
r.alloc.Destroy()
}
// StatusREST implements the REST endpoint for changing the status of a service. // StatusREST implements the REST endpoint for changing the status of a service.
type StatusREST struct { type StatusREST struct {
store *genericregistry.Store store *genericregistry.Store