cache/reflector: check the value of the initial-events-end annotation

Kubernetes-commit: 04668c00432cbd552b08eb94f829643facbd2061
This commit is contained in:
Lukasz Szaszkiewicz 2023-09-19 12:59:23 +02:00 committed by Kubernetes Publisher
parent b5b9332330
commit d0ea06d597
2 changed files with 22 additions and 1 deletions

View File

@ -764,7 +764,7 @@ loop:
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok {
if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {
if exitOnInitialEventsEndBookmark != nil {
*exitOnInitialEventsEndBookmark = true
}

View File

@ -350,6 +350,27 @@ func TestWatchList(t *testing.T) {
expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p3", "3")},
expectedError: apierrors.NewResourceExpired("rv already expired"),
},
{
name: "prove that the reflector is checking the value of the initialEventsEnd annotation",
closeAfterWatchEvents: 3,
watchEvents: []watch.Event{
{Type: watch.Added, Object: makePod("p1", "1")},
{Type: watch.Added, Object: makePod("p2", "2")},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{"k8s.io/initial-events-end": "false"},
},
}},
},
expectedWatchRequests: 1,
expectedRequestOptions: []metav1.ListOptions{{
SendInitialEvents: pointer.Bool(true),
AllowWatchBookmarks: true,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: pointer.Int64(1),
}},
},
}
for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) {