Refactor WithRequireLeader to make it part of the etcd store

This commit is contained in:
Marek Siarkowicz 2023-06-21 14:21:24 +02:00
parent 9e0569f2ed
commit a9af2de8fd
2 changed files with 13 additions and 10 deletions

View File

@ -885,7 +885,18 @@ func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions)
if err != nil {
return nil, err
}
return s.watcher.Watch(ctx, preparedKey, int64(rev), opts.Recursive, opts.ProgressNotify, s.transformer, opts.Predicate)
return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), opts.Recursive, opts.ProgressNotify, s.transformer, opts.Predicate)
}
func (s *store) watchContext(ctx context.Context) context.Context {
// The etcd server waits until it cannot find a leader for 3 election
// timeouts to cancel existing streams. 3 is currently a hard coded
// constant. The election timeout defaults to 1000ms. If the cluster is
// healthy, when the leader is stopped, the leadership transfer should be
// smooth. (leader transfers its leadership before stopping). If leader is
// hard killed, other servers will take an election timeout to realize
// leader lost and start campaign.
return clientv3.WithRequireLeader(ctx)
}
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {

View File

@ -144,15 +144,7 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
// The filter doesn't filter out any object.
wc.internalPred = storage.Everything
}
// The etcd server waits until it cannot find a leader for 3 election
// timeouts to cancel existing streams. 3 is currently a hard coded
// constant. The election timeout defaults to 1000ms. If the cluster is
// healthy, when the leader is stopped, the leadership transfer should be
// smooth. (leader transfers its leadership before stopping). If leader is
// hard killed, other servers will take an election timeout to realize
// leader lost and start campaign.
wc.ctx, wc.cancel = context.WithCancel(clientv3.WithRequireLeader(ctx))
wc.ctx, wc.cancel = context.WithCancel(ctx)
return wc
}