mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-14 22:33:34 +00:00
Merge pull request #131161 from wojtek-t/automated-cherry-pick-of-#131020-upstream-release-1.30
Automated cherry pick of #131020: Fix race for sending errors in watch
This commit is contained in:
commit
df92ced36b
@ -431,7 +431,12 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case e := <-wc.incomingEventChan:
|
case e := <-wc.incomingEventChan:
|
||||||
res := wc.transform(e)
|
res, err := wc.transform(e)
|
||||||
|
if err != nil {
|
||||||
|
wc.sendError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if res == nil {
|
if res == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -465,12 +470,11 @@ func (wc *watchChan) acceptAll() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// transform transforms an event into a result for user if not filtered.
|
// transform transforms an event into a result for user if not filtered.
|
||||||
func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
func (wc *watchChan) transform(e *event) (res *watch.Event, err error) {
|
||||||
curObj, oldObj, err := wc.prepareObjs(e)
|
curObj, oldObj, err := wc.prepareObjs(e)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("failed to prepare current and previous objects: %v", err)
|
klog.Errorf("failed to prepare current and previous objects: %v", err)
|
||||||
wc.sendError(err)
|
return nil, err
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
@ -478,12 +482,11 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
|||||||
object := wc.watcher.newFunc()
|
object := wc.watcher.newFunc()
|
||||||
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
|
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
|
||||||
klog.Errorf("failed to propagate object version: %v", err)
|
klog.Errorf("failed to propagate object version: %v", err)
|
||||||
return nil
|
return nil, fmt.Errorf("failed to propagate object resource version: %w", err)
|
||||||
}
|
}
|
||||||
if e.isInitialEventsEndBookmark {
|
if e.isInitialEventsEndBookmark {
|
||||||
if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil {
|
if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil {
|
||||||
wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err))
|
return nil, fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %w", wc.watcher.groupResource, wc.watcher.objectType, object, err)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
res = &watch.Event{
|
res = &watch.Event{
|
||||||
@ -492,7 +495,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
|||||||
}
|
}
|
||||||
case e.isDeleted:
|
case e.isDeleted:
|
||||||
if !wc.filter(oldObj) {
|
if !wc.filter(oldObj) {
|
||||||
return nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
res = &watch.Event{
|
res = &watch.Event{
|
||||||
Type: watch.Deleted,
|
Type: watch.Deleted,
|
||||||
@ -500,7 +503,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
|||||||
}
|
}
|
||||||
case e.isCreated:
|
case e.isCreated:
|
||||||
if !wc.filter(curObj) {
|
if !wc.filter(curObj) {
|
||||||
return nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
res = &watch.Event{
|
res = &watch.Event{
|
||||||
Type: watch.Added,
|
Type: watch.Added,
|
||||||
@ -512,7 +515,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
|||||||
Type: watch.Modified,
|
Type: watch.Modified,
|
||||||
Object: curObj,
|
Object: curObj,
|
||||||
}
|
}
|
||||||
return res
|
return res, nil
|
||||||
}
|
}
|
||||||
curObjPasses := wc.filter(curObj)
|
curObjPasses := wc.filter(curObj)
|
||||||
oldObjPasses := wc.filter(oldObj)
|
oldObjPasses := wc.filter(oldObj)
|
||||||
@ -534,7 +537,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return res
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func transformErrorToEvent(err error) *watch.Event {
|
func transformErrorToEvent(err error) *watch.Event {
|
||||||
|
@ -138,6 +138,11 @@ func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
|
|||||||
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
|
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchErrorEventIsBlockingFurtherEvent(t *testing.T) {
|
||||||
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.RunWatchErrorIsBlockingFurtherEvents(ctx, t, &storeWithPrefixTransformer{store})
|
||||||
|
}
|
||||||
|
|
||||||
// =======================================================================
|
// =======================================================================
|
||||||
// Implementation-specific tests are following.
|
// Implementation-specific tests are following.
|
||||||
// The following tests are exercising the details of the implementation
|
// The following tests are exercising the details of the implementation
|
||||||
|
@ -1550,6 +1550,73 @@ func RunWatchSemanticInitialEventsExtended(ctx context.Context, t *testing.T, st
|
|||||||
testCheckNoMoreResults(t, w)
|
testCheckNoMoreResults(t, w)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RunWatchErrorIsBlockingFurtherEvents(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
|
||||||
|
foo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"}}
|
||||||
|
fooKey := fmt.Sprintf("/pods/%s/%s", foo.Namespace, foo.Name)
|
||||||
|
fooCreated := &example.Pod{}
|
||||||
|
if err := store.Create(context.Background(), fooKey, foo, fooCreated, 0); err != nil {
|
||||||
|
t.Errorf("failed to create object: %v", err)
|
||||||
|
}
|
||||||
|
bar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "bar"}}
|
||||||
|
barKey := fmt.Sprintf("/pods/%s/%s", bar.Namespace, bar.Name)
|
||||||
|
barCreated := &example.Pod{}
|
||||||
|
if err := store.Create(context.Background(), barKey, bar, barCreated, 0); err != nil {
|
||||||
|
t.Errorf("failed to create object: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update transformer to ensure that foo will become effectively corrupted.
|
||||||
|
revertTransformer := store.UpdatePrefixTransformer(
|
||||||
|
func(transformer *PrefixTransformer) value.Transformer {
|
||||||
|
transformer.prefix = []byte("other-prefix")
|
||||||
|
return transformer
|
||||||
|
})
|
||||||
|
defer revertTransformer()
|
||||||
|
|
||||||
|
baz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "baz"}}
|
||||||
|
bazKey := fmt.Sprintf("/pods/%s/%s", baz.Namespace, baz.Name)
|
||||||
|
bazCreated := &example.Pod{}
|
||||||
|
if err := store.Create(context.Background(), bazKey, baz, bazCreated, 0); err != nil {
|
||||||
|
t.Errorf("failed to create object: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := storage.ListOptions{
|
||||||
|
ResourceVersion: fooCreated.ResourceVersion,
|
||||||
|
Predicate: storage.Everything,
|
||||||
|
Recursive: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run N concurrent watches. Given the asynchronous nature, we increase the
|
||||||
|
// probability of hitting the race in at least one of those watches.
|
||||||
|
concurrentWatches := 10
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for i := 0; i < concurrentWatches; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
w, err := store.Watch(ctx, "/pods", opts)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to create watch: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We issue the watch starting from object bar.
|
||||||
|
// The object fails TransformFromStorage and generates ERROR watch event.
|
||||||
|
// The further events (i.e. ADDED event for baz object) should not be
|
||||||
|
// emitted, so we verify no events other than ERROR type are emitted.
|
||||||
|
for {
|
||||||
|
event, ok := <-w.ResultChan()
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if event.Type != watch.Error {
|
||||||
|
t.Errorf("unexpected event: %#v", event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
func makePod(namePrefix string) *example.Pod {
|
func makePod(namePrefix string) *example.Pod {
|
||||||
return &example.Pod{
|
return &example.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Loading…
Reference in New Issue
Block a user