mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-27 23:48:30 +00:00
Merge pull request #49705 from atlassian/rbuf
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Ring buffer for shared informer notifications **What this PR does / why we need it**: Improves memory allocation for shared informer listeners. Instead of always appending to the slice use as a ring buffer, avoiding reslice operations as long as there is room in the slice. See https://github.com/kubernetes/kubernetes/pull/47045#issuecomment-317621259 for details. This is a follow up PR for #47045. Results from BenchmarkListener: ``` Current code (from the #47045): 1000000 1540 ns/op 109 B/op 1 allocs/op ``` ``` New code: 1000000 1162 ns/op 16 B/op 1 allocs/op ``` **Special notes for your reviewer**: Only review the last commit, this branch is based on #47045 PR. I'll rebase onto master once it is merged. **Release note**: ```release-note NONE ``` /kind enhancement /sig api-machinery /cc @deads2k @ncdc Kubernetes-commit: bb035a2854e690d726ece2f8c5e1b8f4b7aef930
This commit is contained in:
commit
a6c294a7f3
1
tools/cache/BUILD
vendored
1
tools/cache/BUILD
vendored
@ -80,6 +80,7 @@ go_library(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/tools/pager:go_default_library",
|
"//vendor/k8s.io/client-go/tools/pager:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/util/buffer:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
3
tools/cache/processor_listener_test.go
vendored
3
tools/cache/processor_listener_test.go
vendored
@ -34,11 +34,12 @@ func BenchmarkListener(b *testing.B) {
|
|||||||
var swg sync.WaitGroup
|
var swg sync.WaitGroup
|
||||||
swg.Add(b.N)
|
swg.Add(b.N)
|
||||||
b.SetParallelism(concurrencyLevel)
|
b.SetParallelism(concurrencyLevel)
|
||||||
|
// Preallocate enough space so that benchmark does not run out of it
|
||||||
pl := newProcessListener(&ResourceEventHandlerFuncs{
|
pl := newProcessListener(&ResourceEventHandlerFuncs{
|
||||||
AddFunc: func(obj interface{}) {
|
AddFunc: func(obj interface{}) {
|
||||||
swg.Done()
|
swg.Done()
|
||||||
},
|
},
|
||||||
}, 0, 0, time.Now())
|
}, 0, 0, time.Now(), 1024*1024)
|
||||||
var wg wait.Group
|
var wg wait.Group
|
||||||
defer wg.Wait() // Wait for .run and .pop to stop
|
defer wg.Wait() // Wait for .run and .pop to stop
|
||||||
defer close(pl.addCh) // Tell .run and .pop to stop
|
defer close(pl.addCh) // Tell .run and .pop to stop
|
||||||
|
39
tools/cache/shared_informer.go
vendored
39
tools/cache/shared_informer.go
vendored
@ -25,6 +25,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/util/buffer"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
@ -92,8 +93,13 @@ func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEve
|
|||||||
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
|
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
|
||||||
type InformerSynced func() bool
|
type InformerSynced func() bool
|
||||||
|
|
||||||
// syncedPollPeriod controls how often you look at the status of your sync funcs
|
const (
|
||||||
const syncedPollPeriod = 100 * time.Millisecond
|
// syncedPollPeriod controls how often you look at the status of your sync funcs
|
||||||
|
syncedPollPeriod = 100 * time.Millisecond
|
||||||
|
|
||||||
|
// initialBufferSize is the initial number of event notifications that can be buffered.
|
||||||
|
initialBufferSize = 1024
|
||||||
|
)
|
||||||
|
|
||||||
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
|
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
|
||||||
// if the controller should shutdown
|
// if the controller should shutdown
|
||||||
@ -313,7 +319,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now())
|
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
|
||||||
|
|
||||||
if !s.started {
|
if !s.started {
|
||||||
s.processor.addListener(listener)
|
s.processor.addListener(listener)
|
||||||
@ -465,6 +471,13 @@ type processorListener struct {
|
|||||||
|
|
||||||
handler ResourceEventHandler
|
handler ResourceEventHandler
|
||||||
|
|
||||||
|
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
|
||||||
|
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
|
||||||
|
// added until we OOM.
|
||||||
|
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
|
||||||
|
// we should try to do something better.
|
||||||
|
pendingNotifications buffer.RingGrowing
|
||||||
|
|
||||||
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
|
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
|
||||||
requestedResyncPeriod time.Duration
|
requestedResyncPeriod time.Duration
|
||||||
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
|
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
|
||||||
@ -477,11 +490,12 @@ type processorListener struct {
|
|||||||
resyncLock sync.Mutex
|
resyncLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener {
|
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
|
||||||
ret := &processorListener{
|
ret := &processorListener{
|
||||||
nextCh: make(chan interface{}),
|
nextCh: make(chan interface{}),
|
||||||
addCh: make(chan interface{}),
|
addCh: make(chan interface{}),
|
||||||
handler: handler,
|
handler: handler,
|
||||||
|
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
|
||||||
requestedResyncPeriod: requestedResyncPeriod,
|
requestedResyncPeriod: requestedResyncPeriod,
|
||||||
resyncPeriod: resyncPeriod,
|
resyncPeriod: resyncPeriod,
|
||||||
}
|
}
|
||||||
@ -499,25 +513,16 @@ func (p *processorListener) pop() {
|
|||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
defer close(p.nextCh) // Tell .run() to stop
|
defer close(p.nextCh) // Tell .run() to stop
|
||||||
|
|
||||||
// pendingNotifications is an unbounded slice that holds all notifications not yet distributed
|
|
||||||
// there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
|
|
||||||
// added until we OOM.
|
|
||||||
// TODO This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
|
|
||||||
// we should try to do something better
|
|
||||||
var pendingNotifications []interface{}
|
|
||||||
var nextCh chan<- interface{}
|
var nextCh chan<- interface{}
|
||||||
var notification interface{}
|
var notification interface{}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case nextCh <- notification:
|
case nextCh <- notification:
|
||||||
// Notification dispatched
|
// Notification dispatched
|
||||||
if len(pendingNotifications) == 0 { // Nothing to pop
|
var ok bool
|
||||||
|
notification, ok = p.pendingNotifications.ReadOne()
|
||||||
|
if !ok { // Nothing to pop
|
||||||
nextCh = nil // Disable this select case
|
nextCh = nil // Disable this select case
|
||||||
notification = nil
|
|
||||||
} else {
|
|
||||||
notification = pendingNotifications[0]
|
|
||||||
pendingNotifications[0] = nil
|
|
||||||
pendingNotifications = pendingNotifications[1:]
|
|
||||||
}
|
}
|
||||||
case notificationToAdd, ok := <-p.addCh:
|
case notificationToAdd, ok := <-p.addCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -528,7 +533,7 @@ func (p *processorListener) pop() {
|
|||||||
notification = notificationToAdd
|
notification = notificationToAdd
|
||||||
nextCh = p.nextCh
|
nextCh = p.nextCh
|
||||||
} else { // There is already a notification waiting to be dispatched
|
} else { // There is already a notification waiting to be dispatched
|
||||||
pendingNotifications = append(pendingNotifications, notificationToAdd)
|
p.pendingNotifications.WriteOne(notificationToAdd)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
28
util/buffer/BUILD
Normal file
28
util/buffer/BUILD
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["ring_growing.go"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = ["ring_growing_test.go"],
|
||||||
|
library = ":go_default_library",
|
||||||
|
deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
72
util/buffer/ring_growing.go
Normal file
72
util/buffer/ring_growing.go
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package buffer
|
||||||
|
|
||||||
|
// RingGrowing is a growing ring buffer.
|
||||||
|
// Not thread safe.
|
||||||
|
type RingGrowing struct {
|
||||||
|
data []interface{}
|
||||||
|
n int // Size of Data
|
||||||
|
beg int // First available element
|
||||||
|
readable int // Number of data items available
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRingGrowing constructs a new RingGrowing instance with provided parameters.
|
||||||
|
func NewRingGrowing(initialSize int) *RingGrowing {
|
||||||
|
return &RingGrowing{
|
||||||
|
data: make([]interface{}, initialSize),
|
||||||
|
n: initialSize,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadOne reads (consumes) first item from the buffer if it is available, otherwise returns false.
|
||||||
|
func (r *RingGrowing) ReadOne() (data interface{}, ok bool) {
|
||||||
|
if r.readable == 0 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
r.readable--
|
||||||
|
element := r.data[r.beg]
|
||||||
|
r.data[r.beg] = nil // Remove reference to the object to help GC
|
||||||
|
if r.beg == r.n-1 {
|
||||||
|
// Was the last element
|
||||||
|
r.beg = 0
|
||||||
|
} else {
|
||||||
|
r.beg++
|
||||||
|
}
|
||||||
|
return element, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteOne adds an item to the end of the buffer, growing it if it is full.
|
||||||
|
func (r *RingGrowing) WriteOne(data interface{}) {
|
||||||
|
if r.readable == r.n {
|
||||||
|
// Time to grow
|
||||||
|
newN := r.n * 2
|
||||||
|
newData := make([]interface{}, newN)
|
||||||
|
to := r.beg + r.readable
|
||||||
|
if to <= r.n {
|
||||||
|
copy(newData, r.data[r.beg:to])
|
||||||
|
} else {
|
||||||
|
copied := copy(newData, r.data[r.beg:])
|
||||||
|
copy(newData[copied:], r.data[:(to%r.n)])
|
||||||
|
}
|
||||||
|
r.beg = 0
|
||||||
|
r.data = newData
|
||||||
|
r.n = newN
|
||||||
|
}
|
||||||
|
r.data[(r.readable+r.beg)%r.n] = data
|
||||||
|
r.readable++
|
||||||
|
}
|
50
util/buffer/ring_growing_test.go
Normal file
50
util/buffer/ring_growing_test.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package buffer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGrowth(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
x := 10
|
||||||
|
g := NewRingGrowing(1)
|
||||||
|
for i := 0; i < x; i++ {
|
||||||
|
assert.Equal(t, i, g.readable)
|
||||||
|
g.WriteOne(i)
|
||||||
|
}
|
||||||
|
read := 0
|
||||||
|
for g.readable > 0 {
|
||||||
|
v, ok := g.ReadOne()
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, read, v)
|
||||||
|
read++
|
||||||
|
}
|
||||||
|
assert.Equalf(t, x, read, "expected to have read %d items: %d", x, read)
|
||||||
|
assert.Zerof(t, g.readable, "expected readable to be zero: %d", g.readable)
|
||||||
|
assert.Equalf(t, g.n, 16, "expected N to be 16: %d", g.n)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEmpty(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
g := NewRingGrowing(1)
|
||||||
|
_, ok := g.ReadOne()
|
||||||
|
assert.False(t, ok)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user