mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 01:06:27 +00:00
Merge pull request #34925 from wojtek-t/additional_tracing_in_cacher
Automatic merge from submit-queue Add tracing to listing in Cacher
This commit is contained in:
commit
57a47b6af5
@ -367,7 +367,11 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trace := util.NewTrace(fmt.Sprintf("cacher %v: List", c.objectType.String()))
|
||||||
|
defer trace.LogIfLong(250 * time.Millisecond)
|
||||||
|
|
||||||
c.ready.wait()
|
c.ready.wait()
|
||||||
|
trace.Step("Ready")
|
||||||
|
|
||||||
// List elements from cache, with at least 'listRV'.
|
// List elements from cache, with at least 'listRV'.
|
||||||
listPtr, err := meta.GetItemsPtr(listObj)
|
listPtr, err := meta.GetItemsPtr(listObj)
|
||||||
@ -380,10 +384,11 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||||||
}
|
}
|
||||||
filter := filterFunction(key, c.keyFunc, pred)
|
filter := filterFunction(key, c.keyFunc, pred)
|
||||||
|
|
||||||
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
|
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to wait for fresh list: %v", err)
|
return fmt.Errorf("failed to wait for fresh list: %v", err)
|
||||||
}
|
}
|
||||||
|
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
|
||||||
for _, obj := range objs {
|
for _, obj := range objs {
|
||||||
object, ok := obj.(runtime.Object)
|
object, ok := obj.(runtime.Object)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -393,6 +398,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
|
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
|
||||||
if c.versioner != nil {
|
if c.versioner != nil {
|
||||||
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
|
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api/meta"
|
"k8s.io/kubernetes/pkg/api/meta"
|
||||||
"k8s.io/kubernetes/pkg/client/cache"
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/util"
|
||||||
"k8s.io/kubernetes/pkg/util/clock"
|
"k8s.io/kubernetes/pkg/util/clock"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
@ -206,7 +207,7 @@ func (w *watchCache) List() []interface{} {
|
|||||||
return w.store.List()
|
return w.store.List()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {
|
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) {
|
||||||
startTime := w.clock.Now()
|
startTime := w.clock.Now()
|
||||||
go func() {
|
go func() {
|
||||||
// Wake us up when the time limit has expired. The docs
|
// Wake us up when the time limit has expired. The docs
|
||||||
@ -228,6 +229,9 @@ func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{
|
|||||||
}
|
}
|
||||||
w.cond.Wait()
|
w.cond.Wait()
|
||||||
}
|
}
|
||||||
|
if trace != nil {
|
||||||
|
trace.Step("Cache is fresh enough")
|
||||||
|
}
|
||||||
return w.store.List(), w.resourceVersion, nil
|
return w.store.List(), w.resourceVersion, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,7 +248,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
|
|||||||
store.Add(makeTestPod("bar", 5))
|
store.Add(makeTestPod("bar", 5))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
list, resourceVersion, err := store.WaitUntilFreshAndList(5)
|
list, resourceVersion, err := store.WaitUntilFreshAndList(5, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -278,7 +278,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
|
|||||||
store.Add(makeTestPod("bar", 5))
|
store.Add(makeTestPod("bar", 5))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_, _, err := store.WaitUntilFreshAndList(5)
|
_, _, err := store.WaitUntilFreshAndList(5, nil)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("unexpected lack of timeout error")
|
t.Fatalf("unexpected lack of timeout error")
|
||||||
}
|
}
|
||||||
@ -300,7 +300,7 @@ func TestReflectorForWatchCache(t *testing.T) {
|
|||||||
store := newTestWatchCache(5)
|
store := newTestWatchCache(5)
|
||||||
|
|
||||||
{
|
{
|
||||||
_, version, err := store.WaitUntilFreshAndList(0)
|
_, version, err := store.WaitUntilFreshAndList(0, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -323,7 +323,7 @@ func TestReflectorForWatchCache(t *testing.T) {
|
|||||||
r.ListAndWatch(wait.NeverStop)
|
r.ListAndWatch(wait.NeverStop)
|
||||||
|
|
||||||
{
|
{
|
||||||
_, version, err := store.WaitUntilFreshAndList(10)
|
_, version, err := store.WaitUntilFreshAndList(10, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user