mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-14 05:36:12 +00:00
Delta fifo includes objects in DeleteFinalStateUnknow, rcs stop faster
This commit is contained in:
40
pkg/client/cache/delta_fifo.go
vendored
40
pkg/client/cache/delta_fifo.go
vendored
@@ -22,6 +22,8 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// NewDeltaFIFO returns a Store which can be used process changes to items.
|
||||
@@ -76,6 +78,13 @@ func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjectKeys K
|
||||
// threads, you could end up with multiple threads processing slightly
|
||||
// different versions of the same object.
|
||||
//
|
||||
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
|
||||
// to list keys that are "known", for the puspose of figuring out which
|
||||
// items have been deleted when Replace() is called. If the given KeyLister
|
||||
// also satisfies the KeyGetter interface, the deleted objet will be
|
||||
// included in the DeleteFinalStateUnknown markers. These objects
|
||||
// could be stale.
|
||||
//
|
||||
// You may provide a function to compress deltas (e.g., represent a
|
||||
// series of Updates as a single Update).
|
||||
type DeltaFIFO struct {
|
||||
@@ -334,7 +343,21 @@ func (f *DeltaFIFO) Replace(list []interface{}) error {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k}); err != nil {
|
||||
var deletedObj interface{}
|
||||
if keyGetter, ok := f.knownObjectKeys.(KeyGetter); ok {
|
||||
var exists bool
|
||||
var err error
|
||||
deletedObj, exists, err = keyGetter.GetByKey(k)
|
||||
if err != nil || !exists {
|
||||
deletedObj = nil
|
||||
if err != nil {
|
||||
glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
|
||||
} else {
|
||||
glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -346,12 +369,9 @@ type KeyLister interface {
|
||||
ListKeys() []string
|
||||
}
|
||||
|
||||
// KeyListerFunc adapts a raw function to be a KeyLister.
|
||||
type KeyListerFunc func() []string
|
||||
|
||||
// ListKeys just calls kl.
|
||||
func (kl KeyListerFunc) ListKeys() []string {
|
||||
return kl()
|
||||
// A KeyGetter is anything that knows how to get the value stored under a given key.
|
||||
type KeyGetter interface {
|
||||
GetByKey(key string) (interface{}, bool, error)
|
||||
}
|
||||
|
||||
// DeltaCompressor is an algorithm that removes redundant changes.
|
||||
@@ -427,8 +447,10 @@ func copyDeltas(d Deltas) Deltas {
|
||||
}
|
||||
|
||||
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
|
||||
// an object was deleted but the watch deletion event was was missed.
|
||||
// In this case we don't know the final "resting" state of the object.
|
||||
// an object was deleted but the watch deletion event was missed. In this
|
||||
// case we don't know the final "resting" state of the object, so there's
|
||||
// a chance the included `Obj` is stale.
|
||||
type DeletedFinalStateUnknown struct {
|
||||
Key string
|
||||
Obj interface{}
|
||||
}
|
||||
|
28
pkg/client/cache/delta_fifo_test.go
vendored
28
pkg/client/cache/delta_fifo_test.go
vendored
@@ -27,6 +27,24 @@ func testPop(f *DeltaFIFO) testFifoObject {
|
||||
return f.Pop().(Deltas).Newest().Object.(testFifoObject)
|
||||
}
|
||||
|
||||
// keyLookupFunc adapts a raw function to be a KeyLookup.
|
||||
type keyLookupFunc func() []string
|
||||
|
||||
// ListKeys just calls kl.
|
||||
func (kl keyLookupFunc) ListKeys() []string {
|
||||
return kl()
|
||||
}
|
||||
|
||||
// GetByKey returns the key if it exists in the list returned by kl.
|
||||
func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
|
||||
for _, v := range kl() {
|
||||
if v == key {
|
||||
return key, true, nil
|
||||
}
|
||||
}
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_basic(t *testing.T) {
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
|
||||
const amount = 500
|
||||
@@ -174,7 +192,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
KeyListerFunc(func() []string {
|
||||
keyLookupFunc(func() []string {
|
||||
return []string{"foo", "bar", "baz"}
|
||||
}),
|
||||
)
|
||||
@@ -184,7 +202,9 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
||||
expectedList := []Deltas{
|
||||
{{Deleted, mkFifoObj("baz", 10)}},
|
||||
{{Sync, mkFifoObj("foo", 5)}},
|
||||
{{Deleted, DeletedFinalStateUnknown{Key: "bar"}}},
|
||||
// Since "bar" didn't have a delete event and wasn't in the Replace list
|
||||
// it should get a tombstone key with the right Obj.
|
||||
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: "bar"}}},
|
||||
}
|
||||
|
||||
for _, expected := range expectedList {
|
||||
@@ -259,9 +279,9 @@ func TestDeltaFIFO_KeyOf(t *testing.T) {
|
||||
key string
|
||||
}{
|
||||
{obj: testFifoObject{name: "A"}, key: "A"},
|
||||
{obj: DeletedFinalStateUnknown{Key: "B"}, key: "B"},
|
||||
{obj: DeletedFinalStateUnknown{Key: "B", Obj: nil}, key: "B"},
|
||||
{obj: Deltas{{Object: testFifoObject{name: "C"}}}, key: "C"},
|
||||
{obj: Deltas{{Object: DeletedFinalStateUnknown{Key: "D"}}}, key: "D"},
|
||||
{obj: Deltas{{Object: DeletedFinalStateUnknown{Key: "D", Obj: nil}}}, key: "D"},
|
||||
}
|
||||
|
||||
for _, item := range table {
|
||||
|
Reference in New Issue
Block a user