mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Upgrade ListWatchUntil
This commit is contained in:
parent
09af8485f2
commit
603dd254ac
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package tests
|
package tests
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"testing"
|
"testing"
|
||||||
@ -194,11 +195,20 @@ func (w lw) Watch(options metav1.ListOptions) (watch.Interface, error) {
|
|||||||
func TestListWatchUntil(t *testing.T) {
|
func TestListWatchUntil(t *testing.T) {
|
||||||
fw := watch.NewFake()
|
fw := watch.NewFake()
|
||||||
go func() {
|
go func() {
|
||||||
var obj *v1.Pod
|
obj := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
ResourceVersion: "2",
|
||||||
|
},
|
||||||
|
}
|
||||||
fw.Modify(obj)
|
fw.Modify(obj)
|
||||||
}()
|
}()
|
||||||
listwatch := lw{
|
listwatch := lw{
|
||||||
list: &v1.PodList{Items: []v1.Pod{{}}},
|
list: &v1.PodList{
|
||||||
|
ListMeta: metav1.ListMeta{
|
||||||
|
ResourceVersion: "1",
|
||||||
|
},
|
||||||
|
Items: []v1.Pod{{}},
|
||||||
|
},
|
||||||
watch: fw,
|
watch: fw,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -213,8 +223,9 @@ func TestListWatchUntil(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
timeout := 10 * time.Second
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
lastEvent, err := watchtools.ListWatchUntil(timeout, listwatch, conditions...)
|
defer cancel()
|
||||||
|
lastEvent, err := watchtools.ListWatchUntil(ctx, listwatch, conditions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("expected nil error, got %#v", err)
|
t.Fatalf("expected nil error, got %#v", err)
|
||||||
}
|
}
|
||||||
|
@ -168,13 +168,14 @@ func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (
|
|||||||
return context.WithTimeout(parent, timeout)
|
return context.WithTimeout(parent, timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
|
// ListWatchUntil first lists objects, converts them into synthetic ADDED events
|
||||||
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
|
// and checks conditions for those synthetic events. If the conditions have not been reached so far
|
||||||
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
|
// it continues by calling Until which establishes a watch from resourceVersion of the list call
|
||||||
// TODO: remove when no longer used
|
// to evaluate those conditions based on new events.
|
||||||
//
|
// ListWatchUntil provides the same guarantees as Until and replaces the old WATCH from RV "" (or "0")
|
||||||
// Deprecated: Use UntilWithSync instead.
|
// which was mixing list and watch calls internally and having severe design issues. (see #74022)
|
||||||
func ListWatchUntil(timeout time.Duration, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) {
|
// There is no resourceVersion order guarantee for the initial list and those synthetic events.
|
||||||
|
func ListWatchUntil(ctx context.Context, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) {
|
||||||
if len(conditions) == 0 {
|
if len(conditions) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@ -231,17 +232,5 @@ func ListWatchUntil(timeout time.Duration, lw cache.ListerWatcher, conditions ..
|
|||||||
}
|
}
|
||||||
currResourceVersion := metaObj.GetResourceVersion()
|
currResourceVersion := metaObj.GetResourceVersion()
|
||||||
|
|
||||||
watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion})
|
return Until(ctx, currResourceVersion, lw, remainingConditions...)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := ContextWithOptionalTimeout(context.Background(), timeout)
|
|
||||||
defer cancel()
|
|
||||||
evt, err := UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
|
|
||||||
if err == ErrWatchClosed {
|
|
||||||
// present a consistent error interface to callers
|
|
||||||
err = wait.ErrWaitTimeout
|
|
||||||
}
|
|
||||||
return evt, err
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user