Send list before ever calling Watch

This commit is contained in:
Darren Shepherd
2019-08-09 17:24:25 -07:00
parent e806137905
commit e71d8fb0df

View File

@@ -1,7 +1,6 @@
package proxy package proxy
import ( import (
"context"
"sync" "sync"
"github.com/rancher/norman/pkg/types" "github.com/rancher/norman/pkg/types"
@@ -119,35 +118,33 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.
return nil, err return nil, err
} }
timeout := int64(60 * 30)
watcher, err := k8sClient.Watch(metav1.ListOptions{
Watch: true,
TimeoutSeconds: &timeout,
ResourceVersion: list.GetResourceVersion(),
})
if err != nil {
return nil, err
}
watchingContext, cancelWatchingContext := context.WithCancel(apiOp.Request.Context())
go func() {
<-watchingContext.Done()
logrus.Debugf("stopping watcher for %s", schema.ID)
watcher.Stop()
}()
result := make(chan types.APIEvent) result := make(chan types.APIEvent)
go func() { go func() {
defer func() {
logrus.Debugf("closing watcher for %s", schema.ID)
close(result)
}()
for i, obj := range list.Items { for i, obj := range list.Items {
result <- s.toAPIEvent(apiOp, schema, i, len(list.Items), false, &obj) result <- s.toAPIEvent(apiOp, schema, i, len(list.Items), false, &obj)
} }
timeout := int64(60 * 30)
watcher, err := k8sClient.Watch(metav1.ListOptions{
Watch: true,
TimeoutSeconds: &timeout,
ResourceVersion: list.GetResourceVersion(),
})
if err != nil {
logrus.Debugf("stopping watch for %s: %v", schema.ID, err)
return
}
defer watcher.Stop()
for event := range watcher.ResultChan() { for event := range watcher.ResultChan() {
data := event.Object.(*unstructured.Unstructured) data := event.Object.(*unstructured.Unstructured)
result <- s.toAPIEvent(apiOp, schema, 0, 0, event.Type == watch.Deleted, data) result <- s.toAPIEvent(apiOp, schema, 0, 0, event.Type == watch.Deleted, data)
} }
logrus.Debugf("closing watcher for %s", schema.ID)
close(result)
cancelWatchingContext()
}() }()
return result, nil return result, nil