Vendor in containers/(common, storage, image)

Signed-off-by: Daniel J Walsh <dwalsh@redhat.com>
This commit is contained in:
Daniel J Walsh
2022-05-03 09:59:43 -04:00
parent b8b0e9937b
commit 3c286dd1d1
152 changed files with 9926 additions and 1403 deletions

View File

@@ -17,31 +17,21 @@ import (
// Bar represents a progress bar.
type Bar struct {
priority int // used by heap
index int // used by heap
toShutdown bool
toDrop bool
noPop bool
hasEwmaDecorators bool
operateState chan func(*bState)
frameCh chan *frame
// cancel is called either by user or on complete event
cancel func()
// done is closed after cacheState is assigned
done chan struct{}
// cacheState is populated, right after close(b.done)
cacheState *bState
index int // used by heap
priority int // used by heap
hasEwma bool
frameCh chan *renderFrame
operateState chan func(*bState)
done chan struct{}
container *Progress
bs *bState
cancel func()
recoveredPanic interface{}
}
type extenderFunc func(in io.Reader, reqWidth int, st decor.Statistics) (out io.Reader, lines int)
// bState is actual bar state. It gets passed to *Bar.serve(...) monitor
// goroutine.
// bState is actual bar's state.
type bState struct {
id int
priority int
@@ -52,7 +42,6 @@ type bState struct {
lastIncrement int64
trimSpace bool
completed bool
completeFlushed bool
aborted bool
triggerComplete bool
dropOnComplete bool
@@ -66,29 +55,28 @@ type bState struct {
filler BarFiller
middleware func(BarFiller) BarFiller
extender extenderFunc
debugOut io.Writer
// runningBar is a key for *pState.parkedBars
runningBar *Bar
debugOut io.Writer
afterBar *Bar // key for (*pState).queueBars
sync bool
}
type frame struct {
reader io.Reader
lines int
type renderFrame struct {
reader io.Reader
lines int
shutdown bool
}
func newBar(container *Progress, bs *bState) *Bar {
ctx, cancel := context.WithCancel(container.ctx)
bar := &Bar{
container: container,
priority: bs.priority,
toDrop: bs.dropOnComplete,
noPop: bs.noPop,
hasEwma: len(bs.ewmaDecorators) != 0,
frameCh: make(chan *renderFrame, 1),
operateState: make(chan func(*bState)),
frameCh: make(chan *frame, 1),
done: make(chan struct{}),
container: container,
cancel: cancel,
}
@@ -97,12 +85,20 @@ func newBar(container *Progress, bs *bState) *Bar {
}
// ProxyReader wraps r with metrics required for progress tracking.
// Panics if r is nil.
// If r is 'unknown total/size' reader it's mandatory to call
// (*Bar).SetTotal(-1, true) method after (Reader).Read returns io.EOF.
// Panics if r is nil. If bar is already completed or aborted, returns
// nil.
func (b *Bar) ProxyReader(r io.Reader) io.ReadCloser {
if r == nil {
panic("expected non nil io.Reader")
}
return b.newProxyReader(r)
select {
case <-b.done:
return nil
default:
return b.newProxyReader(r)
}
}
// ID returs id of the bar.
@@ -112,18 +108,18 @@ func (b *Bar) ID() int {
case b.operateState <- func(s *bState) { result <- s.id }:
return <-result
case <-b.done:
return b.cacheState.id
return b.bs.id
}
}
// Current returns bar's current number, in other words sum of all increments.
// Current returns bar's current value, in other words sum of all increments.
func (b *Bar) Current() int64 {
result := make(chan int64)
select {
case b.operateState <- func(s *bState) { result <- s.current }:
return <-result
case <-b.done:
return b.cacheState.current
return b.bs.current
}
}
@@ -142,7 +138,7 @@ func (b *Bar) SetRefill(amount int64) {
// TraverseDecorators traverses all available decorators and calls cb func on each.
func (b *Bar) TraverseDecorators(cb func(decor.Decorator)) {
done := make(chan struct{})
sync := make(chan struct{})
select {
case b.operateState <- func(s *bState) {
for _, decorators := range [...][]decor.Decorator{
@@ -153,28 +149,56 @@ func (b *Bar) TraverseDecorators(cb func(decor.Decorator)) {
cb(extractBaseDecorator(d))
}
}
close(done)
close(sync)
}:
<-done
<-sync
case <-b.done:
}
}
// SetTotal sets total dynamically.
// If total is negative it takes progress' current value.
func (b *Bar) SetTotal(total int64, triggerComplete bool) {
// EnableTriggerComplete enables triggering complete event. It's
// effective only for bar which was constructed with `total <= 0` and
// after total has been set with (*Bar).SetTotal(int64, false). If bar
// has been incremented to the total, complete event is triggered right
// away.
func (b *Bar) EnableTriggerComplete() {
select {
case b.operateState <- func(s *bState) {
s.triggerComplete = triggerComplete
if s.triggerComplete || s.total <= 0 {
return
}
if s.current >= s.total {
s.current = s.total
s.completed = true
go b.forceRefresh()
} else {
s.triggerComplete = true
}
}:
case <-b.done:
}
}
// SetTotal sets total to an arbitrary value. It's effective only for
// bar which was constructed with `total <= 0`. Setting total to negative
// value is equivalent to (*Bar).SetTotal((*Bar).Current(), bool).
// If triggerCompleteNow is true, total value is set to current and
// complete event is triggered right away.
func (b *Bar) SetTotal(total int64, triggerCompleteNow bool) {
select {
case b.operateState <- func(s *bState) {
if s.triggerComplete {
return
}
if total < 0 {
s.total = s.current
} else {
s.total = total
}
if s.triggerComplete && !s.completed {
if triggerCompleteNow {
s.current = s.total
s.completed = true
go b.forceRefreshIfLastUncompleted()
go b.forceRefresh()
}
}:
case <-b.done:
@@ -191,7 +215,7 @@ func (b *Bar) SetCurrent(current int64) {
if s.triggerComplete && s.current >= s.total {
s.current = s.total
s.completed = true
go b.forceRefreshIfLastUncompleted()
go b.forceRefresh()
}
}:
case <-b.done:
@@ -220,7 +244,7 @@ func (b *Bar) IncrInt64(n int64) {
if s.triggerComplete && s.current >= s.total {
s.current = s.total
s.completed = true
go b.forceRefreshIfLastUncompleted()
go b.forceRefresh()
}
}:
case <-b.done:
@@ -242,9 +266,9 @@ func (b *Bar) DecoratorEwmaUpdate(dur time.Duration) {
}
}:
case <-b.done:
if b.cacheState.lastIncrement > 0 {
b.cacheState.decoratorEwmaUpdate(dur)
b.cacheState.lastIncrement = 0
if b.bs.lastIncrement > 0 {
b.bs.decoratorEwmaUpdate(dur)
b.bs.lastIncrement = 0
}
}
}
@@ -270,44 +294,33 @@ func (b *Bar) SetPriority(priority int) {
// Abort interrupts bar's running goroutine. Abort won't be engaged
// if bar is already in complete state. If drop is true bar will be
// removed as well.
// removed as well. To make sure that bar has been removed call
// (*Bar).Wait method.
func (b *Bar) Abort(drop bool) {
done := make(chan struct{})
select {
case b.operateState <- func(s *bState) {
if s.completed {
close(done)
if s.completed || s.aborted {
return
}
s.aborted = true
b.cancel()
// container must be run during lifetime of this inner goroutine
// we control this by done channel declared above
go func() {
if drop {
b.container.dropBar(b)
} else {
var uncompleted int
b.container.traverseBars(func(bar *Bar) bool {
if b != bar && !bar.Completed() {
uncompleted++
return false
}
return true
})
if uncompleted == 0 {
b.container.refreshCh <- time.Now()
}
}
close(done) // release hold of Abort
}()
s.dropOnComplete = drop
go b.forceRefresh()
}:
// guarantee: container is alive during lifetime of this hold
<-done
case <-b.done:
}
}
// Aborted reports whether the bar is in aborted state.
func (b *Bar) Aborted() bool {
result := make(chan bool)
select {
case b.operateState <- func(s *bState) { result <- s.aborted }:
return <-result
case <-b.done:
return b.bs.aborted
}
}
// Completed reports whether the bar is in completed state.
func (b *Bar) Completed() bool {
result := make(chan bool)
@@ -315,19 +328,28 @@ func (b *Bar) Completed() bool {
case b.operateState <- func(s *bState) { result <- s.completed }:
return <-result
case <-b.done:
return true
return b.bs.completed
}
}
func (b *Bar) serve(ctx context.Context, s *bState) {
// Wait blocks until bar is completed or aborted.
func (b *Bar) Wait() {
<-b.done
}
func (b *Bar) serve(ctx context.Context, bs *bState) {
defer b.container.bwg.Done()
if bs.afterBar != nil && bs.sync {
bs.afterBar.Wait()
}
for {
select {
case op := <-b.operateState:
op(s)
op(bs)
case <-ctx.Done():
s.decoratorShutdownNotify()
b.cacheState = s
bs.aborted = !bs.completed
bs.decoratorShutdownNotify()
b.bs = bs
close(b.done)
return
}
@@ -337,79 +359,62 @@ func (b *Bar) serve(ctx context.Context, s *bState) {
func (b *Bar) render(tw int) {
select {
case b.operateState <- func(s *bState) {
var reader io.Reader
var lines int
stat := newStatistics(tw, s)
defer func() {
// recovering if user defined decorator panics for example
if p := recover(); p != nil {
if b.recoveredPanic == nil {
if s.debugOut != nil {
fmt.Fprintln(s.debugOut, p)
_, _ = s.debugOut.Write(debug.Stack())
}
s.extender = makePanicExtender(p)
b.toShutdown = !b.toShutdown
b.recoveredPanic = p
if s.debugOut != nil {
fmt.Fprintln(s.debugOut, p)
_, _ = s.debugOut.Write(debug.Stack())
}
reader, lines := s.extender(nil, s.reqWidth, stat)
b.frameCh <- &frame{reader, lines + 1}
s.aborted = !s.completed
s.extender = makePanicExtender(p)
reader, lines = s.extender(nil, s.reqWidth, stat)
b.recoveredPanic = p
}
s.completeFlushed = s.completed
frame := renderFrame{
reader: reader,
lines: lines + 1,
shutdown: s.completed || s.aborted,
}
if frame.shutdown {
b.cancel()
}
b.frameCh <- &frame
}()
reader, lines := s.extender(s.draw(stat), s.reqWidth, stat)
b.toShutdown = s.completed && !s.completeFlushed
b.frameCh <- &frame{reader, lines + 1}
}:
case <-b.done:
s := b.cacheState
stat := newStatistics(tw, s)
var r io.Reader
if b.recoveredPanic == nil {
r = s.draw(stat)
reader = s.draw(stat)
}
reader, lines := s.extender(r, s.reqWidth, stat)
b.frameCh <- &frame{reader, lines + 1}
}
}
func (b *Bar) subscribeDecorators() {
var averageDecorators []decor.AverageDecorator
var ewmaDecorators []decor.EwmaDecorator
var shutdownListeners []decor.ShutdownListener
b.TraverseDecorators(func(d decor.Decorator) {
if d, ok := d.(decor.AverageDecorator); ok {
averageDecorators = append(averageDecorators, d)
}
if d, ok := d.(decor.EwmaDecorator); ok {
ewmaDecorators = append(ewmaDecorators, d)
}
if d, ok := d.(decor.ShutdownListener); ok {
shutdownListeners = append(shutdownListeners, d)
}
})
b.hasEwmaDecorators = len(ewmaDecorators) != 0
select {
case b.operateState <- func(s *bState) {
s.averageDecorators = averageDecorators
s.ewmaDecorators = ewmaDecorators
s.shutdownListeners = shutdownListeners
reader, lines = s.extender(reader, s.reqWidth, stat)
}:
case <-b.done:
var reader io.Reader
var lines int
stat, s := newStatistics(tw, b.bs), b.bs
if b.recoveredPanic == nil {
reader = s.draw(stat)
}
reader, lines = s.extender(reader, s.reqWidth, stat)
b.frameCh <- &renderFrame{
reader: reader,
lines: lines + 1,
}
}
}
func (b *Bar) forceRefreshIfLastUncompleted() {
var uncompleted int
func (b *Bar) forceRefresh() {
var anyOtherRunning bool
b.container.traverseBars(func(bar *Bar) bool {
if b != bar && !bar.Completed() {
uncompleted++
return false
}
return true
anyOtherRunning = b != bar && bar.isRunning()
return !anyOtherRunning
})
if uncompleted == 0 {
if !anyOtherRunning {
for {
select {
case b.container.refreshCh <- time.Now():
time.Sleep(prr)
case <-b.done:
return
}
@@ -417,13 +422,25 @@ func (b *Bar) forceRefreshIfLastUncompleted() {
}
}
func (b *Bar) isRunning() bool {
result := make(chan bool)
select {
case b.operateState <- func(s *bState) {
result <- !s.completed && !s.aborted
}:
return <-result
case <-b.done:
return false
}
}
func (b *Bar) wSyncTable() [][]chan int {
result := make(chan [][]chan int)
select {
case b.operateState <- func(s *bState) { result <- s.wSyncTable() }:
return <-result
case <-b.done:
return b.cacheState.wSyncTable()
return b.bs.wSyncTable()
}
}
@@ -487,6 +504,26 @@ func (s *bState) wSyncTable() [][]chan int {
return table
}
func (s *bState) subscribeDecorators() {
for _, decorators := range [...][]decor.Decorator{
s.pDecorators,
s.aDecorators,
} {
for _, d := range decorators {
d = extractBaseDecorator(d)
if d, ok := d.(decor.AverageDecorator); ok {
s.averageDecorators = append(s.averageDecorators, d)
}
if d, ok := d.(decor.EwmaDecorator); ok {
s.ewmaDecorators = append(s.ewmaDecorators, d)
}
if d, ok := d.(decor.ShutdownListener); ok {
s.shutdownListeners = append(s.shutdownListeners, d)
}
}
}
}
func (s bState) decoratorEwmaUpdate(dur time.Duration) {
wg := new(sync.WaitGroup)
for i := 0; i < len(s.ewmaDecorators); i++ {
@@ -540,12 +577,12 @@ func (s bState) decoratorShutdownNotify() {
func newStatistics(tw int, s *bState) decor.Statistics {
return decor.Statistics{
ID: s.id,
AvailableWidth: tw,
ID: s.id,
Total: s.total,
Current: s.current,
Refill: s.refill,
Completed: s.completeFlushed,
Completed: s.completed,
Aborted: s.aborted,
}
}

View File

@@ -157,9 +157,8 @@ func (s *bFiller) Fill(w io.Writer, width int, stat decor.Statistics) {
return
}
ow := optimisticWriter(w)
ow(s.components[iLbound].bytes)
defer ow(s.components[iRbound].bytes)
mustWrite(w, s.components[iLbound].bytes)
defer mustWrite(w, s.components[iRbound].bytes)
if width == 0 {
return
@@ -231,26 +230,24 @@ func (s *bFiller) Fill(w io.Writer, width int, stat decor.Statistics) {
}
if s.rev {
flush(ow, padding, filling)
flush(w, padding, filling)
} else {
flush(ow, filling, padding)
flush(w, filling, padding)
}
}
func flush(ow func([]byte), filling, padding [][]byte) {
func flush(w io.Writer, filling, padding [][]byte) {
for i := len(filling) - 1; i >= 0; i-- {
ow(filling[i])
mustWrite(w, filling[i])
}
for i := 0; i < len(padding); i++ {
ow(padding[i])
mustWrite(w, padding[i])
}
}
func optimisticWriter(w io.Writer) func([]byte) {
return func(p []byte) {
_, err := w.Write(p)
if err != nil {
panic(err)
}
func mustWrite(w io.Writer, p []byte) {
_, err := w.Write(p)
if err != nil {
panic(err)
}
}

View File

@@ -59,14 +59,17 @@ func BarWidth(width int) BarOption {
}
}
// BarQueueAfter queues this (being constructed) bar to relplace
// runningBar after it has been completed.
func BarQueueAfter(runningBar *Bar) BarOption {
if runningBar == nil {
// BarQueueAfter puts this (being constructed) bar into the queue.
// When argument bar completes or aborts queued bar replaces its place.
// If sync is true queued bar is suspended until argument bar completes
// or aborts.
func BarQueueAfter(bar *Bar, sync bool) BarOption {
if bar == nil {
return nil
}
return func(s *bState) {
s.runningBar = runningBar
s.afterBar = bar
s.sync = sync
}
}

View File

@@ -11,7 +11,7 @@ import (
// ErrNotTTY not a TeleTYpewriter error.
var ErrNotTTY = errors.New("not a terminal")
// http://ascii-table.com/ansi-escape-sequences.php
// https://github.com/dylanaraps/pure-sh-bible#cursor-movement
const (
escOpen = "\x1b["
cuuAndEd = "A\x1b[J"

View File

@@ -47,8 +47,8 @@ const (
// Statistics consists of progress related statistics, that Decorator
// may need.
type Statistics struct {
ID int
AvailableWidth int
ID int
Total int64
Current int64
Refill int64

View File

@@ -2,11 +2,9 @@ package decor
import "io"
func optimisticStringWriter(w io.Writer) func(string) {
return func(s string) {
_, err := io.WriteString(w, s)
if err != nil {
panic(err)
}
func mustWriteString(w io.Writer, s string) {
_, err := io.WriteString(w, s)
if err != nil {
panic(err)
}
}

View File

@@ -23,12 +23,11 @@ func (s percentageType) Format(st fmt.State, verb rune) {
}
}
osw := optimisticStringWriter(st)
osw(strconv.FormatFloat(float64(s), 'f', prec, 64))
mustWriteString(st, strconv.FormatFloat(float64(s), 'f', prec, 64))
if st.Flag(' ') {
osw(" ")
mustWriteString(st, " ")
}
osw("%")
mustWriteString(st, "%")
}
// Percentage returns percentage decorator. It's a wrapper of NewPercentage.

View File

@@ -49,12 +49,11 @@ func (self SizeB1024) Format(st fmt.State, verb rune) {
unit = _iTiB
}
osw := optimisticStringWriter(st)
osw(strconv.FormatFloat(float64(self)/float64(unit), 'f', prec, 64))
mustWriteString(st, strconv.FormatFloat(float64(self)/float64(unit), 'f', prec, 64))
if st.Flag(' ') {
osw(" ")
mustWriteString(st, " ")
}
osw(unit.String())
mustWriteString(st, unit.String())
}
const (
@@ -98,10 +97,9 @@ func (self SizeB1000) Format(st fmt.State, verb rune) {
unit = _TB
}
osw := optimisticStringWriter(st)
osw(strconv.FormatFloat(float64(self)/float64(unit), 'f', prec, 64))
mustWriteString(st, strconv.FormatFloat(float64(self)/float64(unit), 'f', prec, 64))
if st.Flag(' ') {
osw(" ")
mustWriteString(st, " ")
}
osw(unit.String())
mustWriteString(st, unit.String())
}

View File

@@ -23,7 +23,7 @@ type speedFormatter struct {
func (self *speedFormatter) Format(st fmt.State, verb rune) {
self.Formatter.Format(st, verb)
optimisticStringWriter(st)("/s")
mustWriteString(st, "/s")
}
// EwmaSpeed exponential-weighted-moving-average based speed decorator.

View File

@@ -4,7 +4,7 @@ require (
github.com/VividCortex/ewma v1.2.0
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/mattn/go-runewidth v0.0.13
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9
golang.org/x/sys v0.0.0-20220209214540-3681064d5158
)
go 1.14

View File

@@ -6,5 +6,5 @@ github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@@ -16,12 +16,10 @@ import (
)
const (
// default RefreshRate
prr = 150 * time.Millisecond
prr = 150 * time.Millisecond // default RefreshRate
)
// Progress represents a container that renders one or more progress
// bars.
// Progress represents a container that renders one or more progress bars.
type Progress struct {
ctx context.Context
uwg *sync.WaitGroup
@@ -33,14 +31,12 @@ type Progress struct {
once sync.Once
}
// pState holds bars in its priorityQueue. It gets passed to
// *Progress.serve(...) monitor goroutine.
// pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine.
type pState struct {
bHeap priorityQueue
heapUpdated bool
pMatrix map[int][]chan int
aMatrix map[int][]chan int
barShutdownQueue []*Bar
bHeap priorityQueue
heapUpdated bool
pMatrix map[int][]chan int
aMatrix map[int][]chan int
// following are provided/overrided by user
idCount int
@@ -52,26 +48,26 @@ type pState struct {
externalRefresh <-chan interface{}
renderDelay <-chan struct{}
shutdownNotifier chan struct{}
parkedBars map[*Bar]*Bar
queueBars map[*Bar]*Bar
output io.Writer
debugOut io.Writer
}
// New creates new Progress container instance. It's not possible to
// reuse instance after *Progress.Wait() method has been called.
// reuse instance after (*Progress).Wait method has been called.
func New(options ...ContainerOption) *Progress {
return NewWithContext(context.Background(), options...)
}
// NewWithContext creates new Progress container instance with provided
// context. It's not possible to reuse instance after *Progress.Wait()
// context. It's not possible to reuse instance after (*Progress).Wait
// method has been called.
func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
s := &pState{
bHeap: priorityQueue{},
rr: prr,
parkedBars: make(map[*Bar]*Bar),
output: os.Stdout,
bHeap: priorityQueue{},
rr: prr,
queueBars: make(map[*Bar]*Bar),
output: os.Stdout,
}
for _, opt := range options {
@@ -110,8 +106,8 @@ func (p *Progress) New(total int64, builder BarFillerBuilder, options ...BarOpti
}
// Add creates a bar which renders itself by provided filler.
// If `total <= 0` trigger complete event is disabled until reset with *bar.SetTotal(int64, bool).
// Panics if *Progress instance is done, i.e. called after *Progress.Wait().
// If `total <= 0` triggering complete event by increment methods is disabled.
// Panics if *Progress instance is done, i.e. called after (*Progress).Wait().
func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) *Bar {
if filler == nil {
filler = NopStyle().Build()
@@ -122,9 +118,8 @@ func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) *Bar
case p.operateState <- func(ps *pState) {
bs := ps.makeBarState(total, filler, options...)
bar := newBar(p, bs)
if bs.runningBar != nil {
bs.runningBar.noPop = true
ps.parkedBars[bs.runningBar] = bar
if bs.afterBar != nil {
ps.queueBars[bs.afterBar] = bar
} else {
heap.Push(&ps.bHeap, bar)
ps.heapUpdated = true
@@ -133,7 +128,6 @@ func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) *Bar
result <- bar
}:
bar := <-result
bar.subscribeDecorators()
return bar
case <-p.done:
p.bwg.Done()
@@ -141,21 +135,8 @@ func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) *Bar
}
}
func (p *Progress) dropBar(b *Bar) {
select {
case p.operateState <- func(s *pState) {
if b.index < 0 {
return
}
heap.Remove(&s.bHeap, b.index)
s.heapUpdated = true
}:
case <-p.done:
}
}
func (p *Progress) traverseBars(cb func(b *Bar) bool) {
done := make(chan struct{})
sync := make(chan struct{})
select {
case p.operateState <- func(s *pState) {
for i := 0; i < s.bHeap.Len(); i++ {
@@ -164,9 +145,9 @@ func (p *Progress) traverseBars(cb func(b *Bar) bool) {
break
}
}
close(done)
close(sync)
}:
<-done
<-sync
case <-p.done:
}
}
@@ -200,8 +181,8 @@ func (p *Progress) BarCount() int {
// After this method has been called, there is no way to reuse *Progress
// instance.
func (p *Progress) Wait() {
// wait for user wg, if any
if p.uwg != nil {
// wait for user wg
p.uwg.Wait()
}
@@ -256,6 +237,64 @@ func (p *Progress) serve(s *pState, cw *cwriter.Writer) {
}
}
func (s *pState) render(cw *cwriter.Writer) error {
if s.heapUpdated {
s.updateSyncMatrix()
s.heapUpdated = false
}
syncWidth(s.pMatrix)
syncWidth(s.aMatrix)
tw, err := cw.GetWidth()
if err != nil {
tw = s.reqWidth
}
for i := 0; i < s.bHeap.Len(); i++ {
bar := s.bHeap[i]
go bar.render(tw)
}
return s.flush(cw)
}
func (s *pState) flush(cw *cwriter.Writer) error {
var lines int
pool := make([]*Bar, 0, s.bHeap.Len())
for s.bHeap.Len() > 0 {
b := heap.Pop(&s.bHeap).(*Bar)
frame := <-b.frameCh
lines += frame.lines
_, err := cw.ReadFrom(frame.reader)
if err != nil {
return err
}
if frame.shutdown {
b.Wait() // waiting for b.done, so it's safe to read b.bs
var toDrop bool
if qb, ok := s.queueBars[b]; ok {
delete(s.queueBars, b)
qb.priority = b.priority
pool = append(pool, qb)
toDrop = true
} else if s.popCompleted && !b.bs.noPop {
lines -= frame.lines
toDrop = true
}
if toDrop || b.bs.dropOnComplete {
s.heapUpdated = true
continue
}
}
pool = append(pool, b)
}
for _, b := range pool {
heap.Push(&s.bHeap, b)
}
return cw.Flush(lines)
}
func (s *pState) newTicker(done <-chan struct{}) chan time.Time {
ch := make(chan time.Time)
if s.shutdownNotifier == nil {
@@ -294,78 +333,6 @@ func (s *pState) newTicker(done <-chan struct{}) chan time.Time {
return ch
}
func (s *pState) render(cw *cwriter.Writer) error {
if s.heapUpdated {
s.updateSyncMatrix()
s.heapUpdated = false
}
syncWidth(s.pMatrix)
syncWidth(s.aMatrix)
tw, err := cw.GetWidth()
if err != nil {
tw = s.reqWidth
}
for i := 0; i < s.bHeap.Len(); i++ {
bar := s.bHeap[i]
go bar.render(tw)
}
return s.flush(cw)
}
func (s *pState) flush(cw *cwriter.Writer) error {
var totalLines int
bm := make(map[*Bar]int, s.bHeap.Len())
for s.bHeap.Len() > 0 {
b := heap.Pop(&s.bHeap).(*Bar)
frame := <-b.frameCh
_, err := cw.ReadFrom(frame.reader)
if err != nil {
return err
}
if b.toShutdown {
if b.recoveredPanic != nil {
s.barShutdownQueue = append(s.barShutdownQueue, b)
b.toShutdown = false
} else {
// shutdown at next flush
// this ensures no bar ends up with less than 100% rendered
defer func() {
s.barShutdownQueue = append(s.barShutdownQueue, b)
}()
}
}
bm[b] = frame.lines
totalLines += frame.lines
}
for _, b := range s.barShutdownQueue {
if parkedBar := s.parkedBars[b]; parkedBar != nil {
parkedBar.priority = b.priority
heap.Push(&s.bHeap, parkedBar)
delete(s.parkedBars, b)
b.toDrop = true
}
if s.popCompleted && !b.noPop {
totalLines -= bm[b]
b.toDrop = true
}
if b.toDrop {
delete(bm, b)
s.heapUpdated = true
}
b.cancel()
}
s.barShutdownQueue = s.barShutdownQueue[0:0]
for b := range bm {
heap.Push(&s.bHeap, b)
}
return cw.Flush(totalLines)
}
func (s *pState) updateSyncMatrix() {
s.pMatrix = make(map[int][]chan int)
s.aMatrix = make(map[int][]chan int)
@@ -418,6 +385,8 @@ func (s *pState) makeBarState(total int64, filler BarFiller, options ...BarOptio
bs.buffers[i] = bytes.NewBuffer(make([]byte, 0, 512))
}
bs.subscribeDecorators()
return bs
}
@@ -427,7 +396,7 @@ func syncWidth(matrix map[int][]chan int) {
}
}
var maxWidthDistributor = func(column []chan int) {
func maxWidthDistributor(column []chan int) {
var maxWidth int
for _, ch := range column {
if w := <-ch; w > maxWidth {

View File

@@ -14,9 +14,6 @@ type proxyReader struct {
func (x proxyReader) Read(p []byte) (int, error) {
n, err := x.ReadCloser.Read(p)
x.bar.IncrBy(n)
if err == io.EOF {
go x.bar.SetTotal(-1, true)
}
return n, err
}
@@ -28,9 +25,6 @@ type proxyWriterTo struct {
func (x proxyWriterTo) WriteTo(w io.Writer) (int64, error) {
n, err := x.wt.WriteTo(w)
x.bar.IncrInt64(n)
if err == io.EOF {
go x.bar.SetTotal(-1, true)
}
return n, err
}
@@ -65,12 +59,12 @@ func (b *Bar) newProxyReader(r io.Reader) (rc io.ReadCloser) {
pr := proxyReader{toReadCloser(r), b}
if wt, ok := r.(io.WriterTo); ok {
pw := proxyWriterTo{pr, wt}
if b.hasEwmaDecorators {
if b.hasEwma {
rc = ewmaProxyWriterTo{ewmaProxyReader{pr}, pw}
} else {
rc = pw
}
} else if b.hasEwmaDecorators {
} else if b.hasEwma {
rc = ewmaProxyReader{pr}
} else {
rc = pr