skopeo/vendor/github.com/vbauerster/mpb/v8/bar.go
renovate[bot] f17b4c9672
Update module github.com/containers/image/v5 to v5.36.0
Signed-off-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-07-15 21:58:37 +00:00

583 lines
14 KiB
Go

package mpb
import (
"bytes"
"context"
"io"
"strings"
"sync"
"time"
"github.com/acarl005/stripansi"
"github.com/mattn/go-runewidth"
"github.com/vbauerster/mpb/v8/decor"
)
// Bar represents a progress bar.
type Bar struct {
index int // used by heap
priority int // used by heap
frameCh chan *renderFrame
operateState chan func(*bState)
container *Progress
bs *bState
bsOk chan struct{}
ctx context.Context
cancel func()
}
type syncTable [2][]chan int
type extenderFunc func(decor.Statistics, ...io.Reader) ([]io.Reader, error)
// bState is actual bar's state.
type bState struct {
id int
priority int
reqWidth int
shutdown int
total int64
current int64
refill int64
trimSpace bool
aborted bool
triggerComplete bool
rmOnComplete bool
noPop bool
autoRefresh bool
buffers [3]*bytes.Buffer
decorGroups [2][]decor.Decorator
ewmaDecorators []decor.EwmaDecorator
filler BarFiller
extender extenderFunc
renderReq chan<- time.Time
waitBar *Bar // key for (*pState).queueBars
}
type renderFrame struct {
rows []io.Reader
shutdown int
rmOnComplete bool
noPop bool
err error
}
func newBar(ctx context.Context, container *Progress, bs *bState) *Bar {
ctx, cancel := context.WithCancel(ctx)
bar := &Bar{
priority: bs.priority,
frameCh: make(chan *renderFrame, 1),
operateState: make(chan func(*bState)),
bsOk: make(chan struct{}),
container: container,
ctx: ctx,
cancel: cancel,
}
container.bwg.Add(1)
go bar.serve(bs)
return bar
}
// ProxyReader wraps io.Reader with metrics required for progress
// tracking. If `r` is 'unknown total/size' reader it's mandatory
// to call `(*Bar).SetTotal(-1, true)` after the wrapper returns
// `io.EOF`. If bar is already completed or aborted, returns nil.
// Panics if `r` is nil.
func (b *Bar) ProxyReader(r io.Reader) io.ReadCloser {
if r == nil {
panic("expected non nil io.Reader")
}
result := make(chan io.ReadCloser)
select {
case b.operateState <- func(s *bState) {
result <- newProxyReader(r, b, len(s.ewmaDecorators) != 0)
}:
return <-result
case <-b.ctx.Done():
return nil
}
}
// ProxyWriter wraps io.Writer with metrics required for progress tracking.
// If bar is already completed or aborted, returns nil.
// Panics if `w` is nil.
func (b *Bar) ProxyWriter(w io.Writer) io.WriteCloser {
if w == nil {
panic("expected non nil io.Writer")
}
result := make(chan io.WriteCloser)
select {
case b.operateState <- func(s *bState) {
result <- newProxyWriter(w, b, len(s.ewmaDecorators) != 0)
}:
return <-result
case <-b.ctx.Done():
return nil
}
}
// ID returns id of the bar.
func (b *Bar) ID() int {
result := make(chan int)
select {
case b.operateState <- func(s *bState) { result <- s.id }:
return <-result
case <-b.bsOk:
return b.bs.id
}
}
// Current returns bar's current value, in other words sum of all increments.
func (b *Bar) Current() int64 {
result := make(chan int64)
select {
case b.operateState <- func(s *bState) { result <- s.current }:
return <-result
case <-b.bsOk:
return b.bs.current
}
}
// SetRefill sets refill flag with specified amount.
// The underlying BarFiller will change its visual representation, to
// indicate refill event. Refill event may be referred to some retry
// operation for example.
func (b *Bar) SetRefill(amount int64) {
select {
case b.operateState <- func(s *bState) { s.refill = min(amount, s.current) }:
case <-b.ctx.Done():
}
}
// TraverseDecorators traverses available decorators and calls `cb`
// on each unwrapped one.
func (b *Bar) TraverseDecorators(cb func(decor.Decorator)) {
select {
case b.operateState <- func(s *bState) {
for _, group := range s.decorGroups {
for _, d := range group {
cb(unwrap(d))
}
}
}:
case <-b.ctx.Done():
}
}
// EnableTriggerComplete enables triggering complete event. It's effective
// only for bars which were constructed with `total <= 0`. If `current >= total`
// at the moment of call, complete event is triggered right away.
func (b *Bar) EnableTriggerComplete() {
select {
case b.operateState <- func(s *bState) {
if s.triggerComplete {
return
}
if s.current >= s.total {
s.current = s.total
s.triggerCompletion(b)
} else {
s.triggerComplete = true
}
}:
case <-b.ctx.Done():
}
}
// SetTotal sets total to an arbitrary value. It's effective only for bar
// which was constructed with `total <= 0`. Setting total to negative value
// is equivalent to `(*Bar).SetTotal((*Bar).Current(), bool)` but faster.
// If `complete` is true complete event is triggered right away.
// Calling `(*Bar).EnableTriggerComplete` makes this one no operational.
func (b *Bar) SetTotal(total int64, complete bool) {
select {
case b.operateState <- func(s *bState) {
if s.triggerComplete {
return
}
if total < 0 {
s.total = s.current
} else {
s.total = total
}
if complete {
s.current = s.total
s.triggerCompletion(b)
}
}:
case <-b.ctx.Done():
}
}
// SetCurrent sets progress' current to an arbitrary value.
func (b *Bar) SetCurrent(current int64) {
if current < 0 {
return
}
select {
case b.operateState <- func(s *bState) {
s.current = current
if s.triggerComplete && s.current >= s.total {
s.current = s.total
s.triggerCompletion(b)
}
}:
case <-b.ctx.Done():
}
}
// Increment is a shorthand for b.IncrInt64(1).
func (b *Bar) Increment() {
b.IncrInt64(1)
}
// IncrBy is a shorthand for b.IncrInt64(int64(n)).
func (b *Bar) IncrBy(n int) {
b.IncrInt64(int64(n))
}
// IncrInt64 increments progress by amount of n.
func (b *Bar) IncrInt64(n int64) {
select {
case b.operateState <- func(s *bState) {
s.current += n
if s.triggerComplete && s.current >= s.total {
s.current = s.total
s.triggerCompletion(b)
}
}:
case <-b.ctx.Done():
}
}
// EwmaIncrement is a shorthand for b.EwmaIncrInt64(1, iterDur).
func (b *Bar) EwmaIncrement(iterDur time.Duration) {
b.EwmaIncrInt64(1, iterDur)
}
// EwmaIncrBy is a shorthand for b.EwmaIncrInt64(int64(n), iterDur).
func (b *Bar) EwmaIncrBy(n int, iterDur time.Duration) {
b.EwmaIncrInt64(int64(n), iterDur)
}
// EwmaIncrInt64 increments progress by amount of n and updates EWMA based
// decorators by dur of a single iteration.
func (b *Bar) EwmaIncrInt64(n int64, iterDur time.Duration) {
select {
case b.operateState <- func(s *bState) {
var wg sync.WaitGroup
wg.Add(len(s.ewmaDecorators))
for _, d := range s.ewmaDecorators {
// d := d // NOTE: uncomment for Go < 1.22, see /doc/faq#closures_and_goroutines
go func() {
defer wg.Done()
d.EwmaUpdate(n, iterDur)
}()
}
s.current += n
if s.triggerComplete && s.current >= s.total {
s.current = s.total
s.triggerCompletion(b)
}
wg.Wait()
}:
case <-b.ctx.Done():
}
}
// EwmaSetCurrent sets progress' current to an arbitrary value and updates
// EWMA based decorators by dur of a single iteration.
func (b *Bar) EwmaSetCurrent(current int64, iterDur time.Duration) {
if current < 0 {
return
}
select {
case b.operateState <- func(s *bState) {
n := current - s.current
var wg sync.WaitGroup
wg.Add(len(s.ewmaDecorators))
for _, d := range s.ewmaDecorators {
// d := d // NOTE: uncomment for Go < 1.22, see /doc/faq#closures_and_goroutines
go func() {
defer wg.Done()
d.EwmaUpdate(n, iterDur)
}()
}
s.current = current
if s.triggerComplete && s.current >= s.total {
s.current = s.total
s.triggerCompletion(b)
}
wg.Wait()
}:
case <-b.ctx.Done():
}
}
// DecoratorAverageAdjust adjusts decorators implementing decor.AverageDecorator interface.
// Call if there is need to set start time after decorators have been constructed.
func (b *Bar) DecoratorAverageAdjust(start time.Time) {
b.TraverseDecorators(func(d decor.Decorator) {
if d, ok := d.(decor.AverageDecorator); ok {
d.AverageAdjust(start)
}
})
}
// SetPriority changes bar's order among multiple bars. Zero is highest
// priority, i.e. bar will be on top. If you don't need to set priority
// dynamically, better use BarPriority option.
func (b *Bar) SetPriority(priority int) {
b.container.UpdateBarPriority(b, priority, false)
}
// Abort interrupts bar's running goroutine. Abort won't be engaged
// if bar is already in complete state. If drop is true bar will be
// removed as well. To make sure that bar has been removed call
// `(*Bar).Wait()` method.
func (b *Bar) Abort(drop bool) {
select {
case b.operateState <- func(s *bState) {
if s.aborted || s.completed() {
return
}
s.aborted = true
s.rmOnComplete = drop
s.triggerCompletion(b)
}:
case <-b.ctx.Done():
}
}
// Aborted reports whether the bar is in aborted state.
func (b *Bar) Aborted() bool {
result := make(chan bool)
select {
case b.operateState <- func(s *bState) { result <- s.aborted }:
return <-result
case <-b.bsOk:
return b.bs.aborted
}
}
// Completed reports whether the bar is in completed state.
func (b *Bar) Completed() bool {
result := make(chan bool)
select {
case b.operateState <- func(s *bState) { result <- s.completed() }:
return <-result
case <-b.bsOk:
return b.bs.completed()
}
}
// IsRunning reports whether the bar is in running state.
func (b *Bar) IsRunning() bool {
select {
case <-b.ctx.Done():
return false
default:
return true
}
}
// Wait blocks until bar is completed or aborted.
func (b *Bar) Wait() {
<-b.bsOk
}
func (b *Bar) serve(bs *bState) {
defer b.container.bwg.Done()
decoratorsOnShutdown := func(group []decor.Decorator) {
for _, d := range group {
if d, ok := unwrap(d).(decor.ShutdownListener); ok {
b.container.bwg.Add(1)
go func() {
defer b.container.bwg.Done()
d.OnShutdown()
}()
}
}
}
for {
select {
case op := <-b.operateState:
op(bs)
case <-b.ctx.Done():
decoratorsOnShutdown(bs.decorGroups[0])
decoratorsOnShutdown(bs.decorGroups[1])
// bar can be aborted by canceling parent ctx without calling b.Abort
bs.aborted = !bs.completed()
b.bs = bs
close(b.bsOk)
return
}
}
}
func (b *Bar) render(tw int) {
fn := func(s *bState) {
frame := new(renderFrame)
stat := s.newStatistics(tw)
r, err := s.draw(stat)
if err != nil {
for _, buf := range s.buffers {
buf.Reset()
}
frame.err = err
b.frameCh <- frame
return
}
frame.rows, frame.err = s.extender(stat, r)
if s.aborted || s.completed() {
frame.shutdown = s.shutdown
frame.rmOnComplete = s.rmOnComplete
frame.noPop = s.noPop
// post increment makes sure OnComplete decorators are rendered
s.shutdown++
}
b.frameCh <- frame
}
select {
case b.operateState <- fn:
case <-b.bsOk:
fn(b.bs)
}
}
func (b *Bar) tryEarlyRefresh(renderReq chan<- time.Time) {
var otherRunning int
b.container.traverseBars(func(bar *Bar) bool {
if b != bar && bar.IsRunning() {
otherRunning++
return false // stop traverse
}
return true // continue traverse
})
if otherRunning == 0 {
for {
select {
case renderReq <- time.Now():
case <-b.ctx.Done():
return
}
}
}
}
func (b *Bar) wSyncTable() syncTable {
result := make(chan syncTable)
select {
case b.operateState <- func(s *bState) { result <- s.wSyncTable() }:
return <-result
case <-b.bsOk:
return b.bs.wSyncTable()
}
}
func (s *bState) draw(stat decor.Statistics) (_ io.Reader, err error) {
decorFiller := func(buf *bytes.Buffer, group []decor.Decorator) (err error) {
for _, d := range group {
// need to call Decor in any case because of width synchronization
str, width := d.Decor(stat)
if err != nil {
continue
}
if w := stat.AvailableWidth - width; w >= 0 {
_, err = buf.WriteString(str)
stat.AvailableWidth = w
} else if stat.AvailableWidth > 0 {
trunc := runewidth.Truncate(stripansi.Strip(str), stat.AvailableWidth, "…")
_, err = buf.WriteString(trunc)
stat.AvailableWidth = 0
}
}
return err
}
for i, buf := range s.buffers[:2] {
err = decorFiller(buf, s.decorGroups[i])
if err != nil {
return nil, err
}
}
spaces := []io.Reader{
strings.NewReader(" "),
strings.NewReader(" "),
}
if s.trimSpace || stat.AvailableWidth < 2 {
for _, r := range spaces {
_, _ = io.Copy(io.Discard, r)
}
} else {
stat.AvailableWidth -= 2
}
err = s.filler.Fill(s.buffers[2], stat)
if err != nil {
return nil, err
}
return io.MultiReader(
s.buffers[0],
spaces[0],
s.buffers[2],
spaces[1],
s.buffers[1],
strings.NewReader("\n"),
), nil
}
func (s *bState) wSyncTable() (table syncTable) {
var start int
var row []chan int
for i, group := range s.decorGroups {
for _, d := range group {
if ch, ok := d.Sync(); ok {
row = append(row, ch)
}
}
table[i], start = row[start:], len(row)
}
return table
}
func (s *bState) triggerCompletion(b *Bar) {
s.triggerComplete = true
if s.autoRefresh {
// Technically this call isn't required, but if refresh rate is set to
// one hour for example and bar completes within a few minutes p.Wait()
// will wait for one hour. This call helps to avoid unnecessary waiting.
go b.tryEarlyRefresh(s.renderReq)
} else {
b.cancel()
}
}
func (s bState) completed() bool {
return s.triggerComplete && s.current == s.total
}
func (s bState) newStatistics(tw int) decor.Statistics {
return decor.Statistics{
AvailableWidth: tw,
RequestedWidth: s.reqWidth,
ID: s.id,
Total: s.total,
Current: s.current,
Refill: s.refill,
Completed: s.completed(),
Aborted: s.aborted,
}
}
func unwrap(d decor.Decorator) decor.Decorator {
if d, ok := d.(decor.Wrapper); ok {
return unwrap(d.Unwrap())
}
return d
}