mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #59825 from dcbw/remove-deltafifo-compressor
Automatic merge from submit-queue (batch tested with PRs 59832, 59825). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Remove unused DeltaFIFO compressor argument to NewDeltaFIFO Nobody uses it; the one or two older users from 1.4/1.5 timeframe were removed for 1.6. It's also poorly understood and the sole example is in the testcases, and it's pretty incomplete. If anyone really wants compression, they can revert this PR. Earlier pull was https://github.com/kubernetes/kubernetes/pull/43475 which was blocked on some downstream users, which have now removed their usage of the compressor. @ncdc @deads2k ```release-note NONE ```
This commit is contained in:
commit
5f7b530d87
@ -288,7 +288,7 @@ func NewInformer(
|
||||
// This will hold incoming changes. Note how we pass clientState in as a
|
||||
// KeyLister, that way resync operations will result in the correct set
|
||||
// of update/delete deltas.
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
@ -355,7 +355,7 @@ func NewIndexerInformer(
|
||||
// This will hold incoming changes. Note how we pass clientState in as a
|
||||
// KeyLister, that way resync operations will result in the correct set
|
||||
// of update/delete deltas.
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
|
@ -44,7 +44,7 @@ func Example() {
|
||||
// This will hold incoming changes. Note how we pass downstream in as a
|
||||
// KeyLister, that way resync operations will result in the correct set
|
||||
// of update/delete deltas.
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, downstream)
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, downstream)
|
||||
|
||||
// Let's do threadsafe output to get predictable test results.
|
||||
deletionCounter := make(chan string, 1000)
|
||||
|
@ -31,11 +31,6 @@ import (
|
||||
// keyFunc is used to figure out what key an object should have. (It's
|
||||
// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
|
||||
//
|
||||
// 'compressor' may compress as many or as few items as it wants
|
||||
// (including returning an empty slice), but it should do what it
|
||||
// does quickly since it is called while the queue is locked.
|
||||
// 'compressor' may be nil if you don't want any delta compression.
|
||||
//
|
||||
// 'keyLister' is expected to return a list of keys that the consumer of
|
||||
// this queue "knows about". It is used to decide which items are missing
|
||||
// when Replace() is called; 'Deleted' deltas are produced for these items.
|
||||
@ -48,13 +43,12 @@ import (
|
||||
// fix.
|
||||
//
|
||||
// Also see the comment on DeltaFIFO.
|
||||
func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
|
||||
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
|
||||
f := &DeltaFIFO{
|
||||
items: map[string]Deltas{},
|
||||
queue: []string{},
|
||||
keyFunc: keyFunc,
|
||||
deltaCompressor: compressor,
|
||||
knownObjects: knownObjects,
|
||||
items: map[string]Deltas{},
|
||||
queue: []string{},
|
||||
keyFunc: keyFunc,
|
||||
knownObjects: knownObjects,
|
||||
}
|
||||
f.cond.L = &f.lock
|
||||
return f
|
||||
@ -86,9 +80,6 @@ func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyL
|
||||
// items have been deleted when Replace() or Delete() are called. The deleted
|
||||
// object will be included in the DeleteFinalStateUnknown markers. These objects
|
||||
// could be stale.
|
||||
//
|
||||
// You may provide a function to compress deltas (e.g., represent a
|
||||
// series of Updates as a single Update).
|
||||
type DeltaFIFO struct {
|
||||
// lock/cond protects access to 'items' and 'queue'.
|
||||
lock sync.RWMutex
|
||||
@ -110,10 +101,6 @@ type DeltaFIFO struct {
|
||||
// insertion and retrieval, and should be deterministic.
|
||||
keyFunc KeyFunc
|
||||
|
||||
// deltaCompressor tells us how to combine two or more
|
||||
// deltas. It may be nil.
|
||||
deltaCompressor DeltaCompressor
|
||||
|
||||
// knownObjects list keys that are "known", for the
|
||||
// purpose of figuring out which items have been deleted
|
||||
// when Replace() or Delete() is called.
|
||||
@ -133,7 +120,6 @@ var (
|
||||
var (
|
||||
// ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
|
||||
// object with zero length is encountered (should be impossible,
|
||||
// even if such an object is accidentally produced by a DeltaCompressor--
|
||||
// but included for completeness).
|
||||
ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
|
||||
)
|
||||
@ -305,8 +291,8 @@ func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
|
||||
return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
|
||||
}
|
||||
|
||||
// queueActionLocked appends to the delta list for the object, calling
|
||||
// f.deltaCompressor if needed. Caller must lock first.
|
||||
// queueActionLocked appends to the delta list for the object.
|
||||
// Caller must lock first.
|
||||
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
|
||||
id, err := f.KeyOf(obj)
|
||||
if err != nil {
|
||||
@ -322,9 +308,6 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
||||
|
||||
newDeltas := append(f.items[id], Delta{actionType, obj})
|
||||
newDeltas = dedupDeltas(newDeltas)
|
||||
if f.deltaCompressor != nil {
|
||||
newDeltas = f.deltaCompressor.Compress(newDeltas)
|
||||
}
|
||||
|
||||
_, exists := f.items[id]
|
||||
if len(newDeltas) > 0 {
|
||||
@ -334,8 +317,7 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
||||
f.items[id] = newDeltas
|
||||
f.cond.Broadcast()
|
||||
} else if exists {
|
||||
// The compression step removed all deltas, so
|
||||
// we need to remove this from our map (extra items
|
||||
// We need to remove this from our map (extra items
|
||||
// in the queue are ignored if they are not in the
|
||||
// map).
|
||||
delete(f.items, id)
|
||||
@ -355,8 +337,8 @@ func (f *DeltaFIFO) List() []interface{} {
|
||||
func (f *DeltaFIFO) listLocked() []interface{} {
|
||||
list := make([]interface{}, 0, len(f.items))
|
||||
for _, item := range f.items {
|
||||
// Copy item's slice so operations on this slice (delta
|
||||
// compression) won't interfere with the object we return.
|
||||
// Copy item's slice so operations on this slice
|
||||
// won't interfere with the object we return.
|
||||
item = copyDeltas(item)
|
||||
list = append(list, item.Newest().Object)
|
||||
}
|
||||
@ -394,8 +376,8 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err
|
||||
defer f.lock.RUnlock()
|
||||
d, exists := f.items[key]
|
||||
if exists {
|
||||
// Copy item's slice so operations on this slice (delta
|
||||
// compression) won't interfere with the object we return.
|
||||
// Copy item's slice so operations on this slice
|
||||
// won't interfere with the object we return.
|
||||
d = copyDeltas(d)
|
||||
}
|
||||
return d, exists, nil
|
||||
@ -603,23 +585,6 @@ type KeyGetter interface {
|
||||
GetByKey(key string) (interface{}, bool, error)
|
||||
}
|
||||
|
||||
// DeltaCompressor is an algorithm that removes redundant changes.
|
||||
type DeltaCompressor interface {
|
||||
Compress(Deltas) Deltas
|
||||
}
|
||||
|
||||
// DeltaCompressorFunc should remove redundant changes; but changes that
|
||||
// are redundant depend on one's desired semantics, so this is an
|
||||
// injectable function.
|
||||
//
|
||||
// DeltaCompressorFunc adapts a raw function to be a DeltaCompressor.
|
||||
type DeltaCompressorFunc func(Deltas) Deltas
|
||||
|
||||
// Compress just calls dc.
|
||||
func (dc DeltaCompressorFunc) Compress(d Deltas) Deltas {
|
||||
return dc(d)
|
||||
}
|
||||
|
||||
// DeltaType is the type of a change (addition, deletion, etc)
|
||||
type DeltaType string
|
||||
|
||||
@ -668,7 +633,7 @@ func (d Deltas) Newest() *Delta {
|
||||
|
||||
// copyDeltas returns a shallow copy of d; that is, it copies the slice but not
|
||||
// the objects in the slice. This allows Get/List to return an object that we
|
||||
// know won't be clobbered by a subsequent call to a delta compressor.
|
||||
// know won't be clobbered by a subsequent modifications.
|
||||
func copyDeltas(d Deltas) Deltas {
|
||||
d2 := make(Deltas, len(d))
|
||||
copy(d2, d)
|
||||
|
@ -51,7 +51,7 @@ func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_basic(t *testing.T) {
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil)
|
||||
const amount = 500
|
||||
go func() {
|
||||
for i := 0; i < amount; i++ {
|
||||
@ -86,7 +86,7 @@ func TestDeltaFIFO_basic(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_requeueOnPop(t *testing.T) {
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil)
|
||||
|
||||
f.Add(mkFifoObj("foo", 10))
|
||||
_, err := f.Pop(func(obj interface{}) error {
|
||||
@ -129,43 +129,8 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_compressorWorks(t *testing.T) {
|
||||
oldestTypes := []DeltaType{}
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
// This function just keeps the most recent delta
|
||||
// and puts deleted ones in the list.
|
||||
DeltaCompressorFunc(func(d Deltas) Deltas {
|
||||
if n := len(d); n > 1 {
|
||||
oldestTypes = append(oldestTypes, d[0].Type)
|
||||
d = d[1:]
|
||||
}
|
||||
return d
|
||||
}),
|
||||
nil,
|
||||
)
|
||||
if f.HasSynced() {
|
||||
t.Errorf("Expected HasSynced to be false before completion of initial population")
|
||||
}
|
||||
f.Add(mkFifoObj("foo", 10))
|
||||
f.Update(mkFifoObj("foo", 12))
|
||||
f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0")
|
||||
f.Delete(mkFifoObj("foo", 22))
|
||||
f.Add(mkFifoObj("foo", 25)) // flush the last one out
|
||||
expect := []DeltaType{Added, Updated, Sync, Deleted}
|
||||
if e, a := expect, oldestTypes; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), Pop(f).(Deltas); !reflect.DeepEqual(e, a) {
|
||||
t.Fatalf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
if !f.HasSynced() {
|
||||
t.Errorf("Expected HasSynced to be true after completion of initial population")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_addUpdate(t *testing.T) {
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil)
|
||||
f.Add(mkFifoObj("foo", 10))
|
||||
f.Update(mkFifoObj("foo", 12))
|
||||
f.Delete(mkFifoObj("foo", 15))
|
||||
@ -203,7 +168,7 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil)
|
||||
f.Add(mkFifoObj("foo", 10))
|
||||
f.Update(mkFifoObj("bar", 15))
|
||||
f.Add(mkFifoObj("qux", 17))
|
||||
@ -226,7 +191,6 @@ func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
|
||||
func TestDeltaFIFO_enqueueingWithLister(t *testing.T) {
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
keyLookupFunc(func() []testFifoObject {
|
||||
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
|
||||
}),
|
||||
@ -249,7 +213,7 @@ func TestDeltaFIFO_enqueueingWithLister(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_addReplace(t *testing.T) {
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil)
|
||||
f.Add(mkFifoObj("foo", 10))
|
||||
f.Replace([]interface{}{mkFifoObj("foo", 15)}, "0")
|
||||
got := make(chan testFifoObject, 2)
|
||||
@ -277,7 +241,6 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
|
||||
func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
keyLookupFunc(func() []testFifoObject {
|
||||
return []testFifoObject{mkFifoObj("foo", 5)}
|
||||
}),
|
||||
@ -297,7 +260,6 @@ func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
|
||||
func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) {
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
keyLookupFunc(func() []testFifoObject {
|
||||
return []testFifoObject{}
|
||||
}),
|
||||
@ -317,7 +279,6 @@ func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) {
|
||||
func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
keyLookupFunc(func() []testFifoObject {
|
||||
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
|
||||
}),
|
||||
@ -344,7 +305,6 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
||||
func TestDeltaFIFO_UpdateResyncRace(t *testing.T) {
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
keyLookupFunc(func() []testFifoObject {
|
||||
return []testFifoObject{mkFifoObj("foo", 5)}
|
||||
}),
|
||||
@ -367,7 +327,6 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) {
|
||||
func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) {
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
keyLookupFunc(func() []testFifoObject {
|
||||
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
|
||||
}),
|
||||
@ -396,7 +355,7 @@ func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_detectLineJumpers(t *testing.T) {
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil)
|
||||
|
||||
f.Add(mkFifoObj("foo", 10))
|
||||
f.Add(mkFifoObj("bar", 1))
|
||||
@ -424,7 +383,7 @@ func TestDeltaFIFO_detectLineJumpers(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_addIfNotPresent(t *testing.T) {
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil)
|
||||
|
||||
f.Add(mkFifoObj("b", 3))
|
||||
b3 := Pop(f)
|
||||
@ -521,7 +480,7 @@ func TestDeltaFIFO_HasSynced(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil)
|
||||
|
||||
for _, action := range test.actions {
|
||||
action(f)
|
||||
|
@ -189,7 +189,7 @@ type deleteNotification struct {
|
||||
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
|
||||
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
|
Loading…
Reference in New Issue
Block a user