mirror of
https://github.com/containers/skopeo.git
synced 2025-08-04 16:20:04 +00:00
464 lines
11 KiB
Go
464 lines
11 KiB
Go
package mpb
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/vbauerster/mpb/v8/cwriter"
|
|
"github.com/vbauerster/mpb/v8/decor"
|
|
)
|
|
|
|
const defaultRefreshRate = 150 * time.Millisecond
|
|
const defaultHmQueueLength = 128
|
|
|
|
// ErrDone represents use after `(*Progress).Wait()` error.
|
|
var ErrDone = fmt.Errorf("%T instance can't be reused after %[1]T.Wait()", (*Progress)(nil))
|
|
|
|
// Progress represents a container that renders one or more progress bars.
|
|
type Progress struct {
|
|
uwg *sync.WaitGroup
|
|
pwg, bwg sync.WaitGroup
|
|
operateState chan func(*pState)
|
|
interceptIO chan func(io.Writer)
|
|
done <-chan struct{}
|
|
cancel func()
|
|
}
|
|
|
|
// pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine.
|
|
type pState struct {
|
|
ctx context.Context
|
|
hm heapManager
|
|
iterDrop chan struct{}
|
|
renderReq chan time.Time
|
|
idCount int
|
|
popPriority int
|
|
|
|
// following are provided/overrode by user
|
|
hmQueueLen int
|
|
reqWidth int
|
|
refreshRate time.Duration
|
|
popCompleted bool
|
|
autoRefresh bool
|
|
delayRC <-chan struct{}
|
|
manualRC <-chan interface{}
|
|
shutdownNotifier chan<- interface{}
|
|
queueBars map[*Bar]*Bar
|
|
output io.Writer
|
|
debugOut io.Writer
|
|
uwg *sync.WaitGroup
|
|
}
|
|
|
|
// New creates new Progress container instance. It's not possible to
|
|
// reuse instance after `(*Progress).Wait` method has been called.
|
|
func New(options ...ContainerOption) *Progress {
|
|
return NewWithContext(context.Background(), options...)
|
|
}
|
|
|
|
// NewWithContext creates new Progress container instance with provided
|
|
// context. It's not possible to reuse instance after `(*Progress).Wait`
|
|
// method has been called.
|
|
func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
s := &pState{
|
|
ctx: ctx,
|
|
hmQueueLen: defaultHmQueueLength,
|
|
iterDrop: make(chan struct{}),
|
|
renderReq: make(chan time.Time),
|
|
popPriority: math.MinInt32,
|
|
refreshRate: defaultRefreshRate,
|
|
queueBars: make(map[*Bar]*Bar),
|
|
output: os.Stdout,
|
|
debugOut: io.Discard,
|
|
}
|
|
|
|
for _, opt := range options {
|
|
if opt != nil {
|
|
opt(s)
|
|
}
|
|
}
|
|
|
|
s.hm = make(heapManager, s.hmQueueLen)
|
|
|
|
p := &Progress{
|
|
uwg: s.uwg,
|
|
operateState: make(chan func(*pState)),
|
|
interceptIO: make(chan func(io.Writer)),
|
|
cancel: cancel,
|
|
}
|
|
|
|
cw := cwriter.New(s.output)
|
|
if s.manualRC != nil {
|
|
done := make(chan struct{})
|
|
p.done = done
|
|
s.autoRefresh = false
|
|
go s.manualRefreshListener(done)
|
|
} else if cw.IsTerminal() || s.autoRefresh {
|
|
done := make(chan struct{})
|
|
p.done = done
|
|
s.autoRefresh = true
|
|
go s.autoRefreshListener(done)
|
|
} else {
|
|
p.done = ctx.Done()
|
|
s.autoRefresh = false
|
|
}
|
|
|
|
p.pwg.Add(1)
|
|
go p.serve(s, cw)
|
|
go s.hm.run()
|
|
return p
|
|
}
|
|
|
|
// AddBar creates a bar with default bar filler.
|
|
func (p *Progress) AddBar(total int64, options ...BarOption) *Bar {
|
|
return p.New(total, BarStyle(), options...)
|
|
}
|
|
|
|
// AddSpinner creates a bar with default spinner filler.
|
|
func (p *Progress) AddSpinner(total int64, options ...BarOption) *Bar {
|
|
return p.New(total, SpinnerStyle(), options...)
|
|
}
|
|
|
|
// New creates a bar by calling `Build` method on provided `BarFillerBuilder`.
|
|
func (p *Progress) New(total int64, builder BarFillerBuilder, options ...BarOption) *Bar {
|
|
if builder == nil {
|
|
return p.MustAdd(total, nil, options...)
|
|
}
|
|
return p.MustAdd(total, builder.Build(), options...)
|
|
}
|
|
|
|
// MustAdd creates a bar which renders itself by provided BarFiller.
|
|
// If `total <= 0` triggering complete event by increment methods is
|
|
// disabled. Panics if called after `(*Progress).Wait()`.
|
|
func (p *Progress) MustAdd(total int64, filler BarFiller, options ...BarOption) *Bar {
|
|
bar, err := p.Add(total, filler, options...)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return bar
|
|
}
|
|
|
|
// Add creates a bar which renders itself by provided BarFiller.
|
|
// If `total <= 0` triggering complete event by increment methods
|
|
// is disabled. If called after `(*Progress).Wait()` then
|
|
// `(nil, ErrDone)` is returned.
|
|
func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) (*Bar, error) {
|
|
if filler == nil {
|
|
filler = NopStyle().Build()
|
|
} else if f, ok := filler.(BarFillerFunc); ok && f == nil {
|
|
filler = NopStyle().Build()
|
|
}
|
|
ch := make(chan *Bar)
|
|
select {
|
|
case p.operateState <- func(ps *pState) {
|
|
bs := ps.makeBarState(total, filler, options...)
|
|
bar := newBar(ps.ctx, p, bs)
|
|
if bs.waitBar != nil {
|
|
ps.queueBars[bs.waitBar] = bar
|
|
} else {
|
|
ps.hm.push(bar, true)
|
|
}
|
|
ps.idCount++
|
|
ch <- bar
|
|
}:
|
|
return <-ch, nil
|
|
case <-p.done:
|
|
return nil, ErrDone
|
|
}
|
|
}
|
|
|
|
func (p *Progress) traverseBars(cb func(b *Bar) bool) {
|
|
drop, iter := make(chan struct{}), make(chan *Bar)
|
|
select {
|
|
case p.operateState <- func(s *pState) { s.hm.iter(drop, iter, nil) }:
|
|
for b := range iter {
|
|
if !cb(b) {
|
|
close(drop)
|
|
break
|
|
}
|
|
}
|
|
case <-p.done:
|
|
}
|
|
}
|
|
|
|
// UpdateBarPriority either immediately or lazy.
|
|
// With lazy flag order is updated after the next refresh cycle.
|
|
// If you don't care about laziness just use `(*Bar).SetPriority(int)`.
|
|
func (p *Progress) UpdateBarPriority(b *Bar, priority int, lazy bool) {
|
|
if b == nil {
|
|
return
|
|
}
|
|
select {
|
|
case p.operateState <- func(s *pState) { s.hm.fix(b, priority, lazy) }:
|
|
case <-p.done:
|
|
}
|
|
}
|
|
|
|
// Write is implementation of io.Writer.
|
|
// Writing to `*Progress` will print lines above a running bar.
|
|
// Writes aren't flushed immediately, but at next refresh cycle.
|
|
// If called after `(*Progress).Wait()` then `(0, ErrDone)` is returned.
|
|
func (p *Progress) Write(b []byte) (int, error) {
|
|
type result struct {
|
|
n int
|
|
err error
|
|
}
|
|
ch := make(chan result)
|
|
select {
|
|
case p.interceptIO <- func(w io.Writer) {
|
|
n, err := w.Write(b)
|
|
ch <- result{n, err}
|
|
}:
|
|
res := <-ch
|
|
return res.n, res.err
|
|
case <-p.done:
|
|
return 0, ErrDone
|
|
}
|
|
}
|
|
|
|
// Wait waits for all bars to complete and finally shutdowns container. After
|
|
// this method has been called, there is no way to reuse `*Progress` instance.
|
|
func (p *Progress) Wait() {
|
|
p.bwg.Wait()
|
|
p.Shutdown()
|
|
// wait for user wg, if any
|
|
if p.uwg != nil {
|
|
p.uwg.Wait()
|
|
}
|
|
}
|
|
|
|
// Shutdown cancels any running bar immediately and then shutdowns `*Progress`
|
|
// instance. Normally this method shouldn't be called unless you know what you
|
|
// are doing. Proper way to shutdown is to call `(*Progress).Wait()` instead.
|
|
func (p *Progress) Shutdown() {
|
|
p.cancel()
|
|
p.pwg.Wait()
|
|
}
|
|
|
|
func (p *Progress) serve(s *pState, cw *cwriter.Writer) {
|
|
defer p.pwg.Done()
|
|
var err error
|
|
var w *cwriter.Writer
|
|
renderReq := s.renderReq
|
|
operateState := p.operateState
|
|
interceptIO := p.interceptIO
|
|
|
|
if s.delayRC != nil {
|
|
w = cwriter.New(io.Discard)
|
|
} else {
|
|
w, cw = cw, nil
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-s.delayRC:
|
|
w, cw = cw, nil
|
|
s.delayRC = nil
|
|
case op := <-operateState:
|
|
op(s)
|
|
case fn := <-interceptIO:
|
|
fn(w)
|
|
case <-renderReq:
|
|
err = s.render(w)
|
|
if err != nil {
|
|
// (*pState).(autoRefreshListener|manualRefreshListener) may block
|
|
// if not launching following short lived goroutine
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-s.renderReq:
|
|
case <-p.done:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
p.cancel() // cancel all bars
|
|
renderReq = nil
|
|
operateState = nil
|
|
interceptIO = nil
|
|
}
|
|
case <-p.done:
|
|
if err != nil {
|
|
_, _ = fmt.Fprintln(s.debugOut, err.Error())
|
|
} else if s.autoRefresh {
|
|
update := make(chan bool)
|
|
for i := 0; i == 0 || <-update; i++ {
|
|
if err := s.render(w); err != nil {
|
|
_, _ = fmt.Fprintln(s.debugOut, err.Error())
|
|
break
|
|
}
|
|
s.hm.state(update)
|
|
}
|
|
}
|
|
s.hm.end(s.shutdownNotifier)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *pState) autoRefreshListener(done chan struct{}) {
|
|
ticker := time.NewTicker(s.refreshRate)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case t := <-ticker.C:
|
|
s.renderReq <- t
|
|
case <-s.ctx.Done():
|
|
close(done)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *pState) manualRefreshListener(done chan struct{}) {
|
|
for {
|
|
select {
|
|
case x := <-s.manualRC:
|
|
if t, ok := x.(time.Time); ok {
|
|
s.renderReq <- t
|
|
} else {
|
|
s.renderReq <- time.Now()
|
|
}
|
|
case <-s.ctx.Done():
|
|
close(done)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *pState) render(cw *cwriter.Writer) (err error) {
|
|
iter, iterPop := make(chan *Bar), make(chan *Bar)
|
|
s.hm.sync(s.iterDrop)
|
|
s.hm.iter(s.iterDrop, iter, iterPop)
|
|
|
|
var width, height int
|
|
if cw.IsTerminal() {
|
|
width, height, err = cw.GetTermSize()
|
|
if err != nil {
|
|
close(s.iterDrop)
|
|
return err
|
|
}
|
|
} else {
|
|
if s.reqWidth > 0 {
|
|
width = s.reqWidth
|
|
} else {
|
|
width = 80
|
|
}
|
|
height = width
|
|
}
|
|
|
|
var barCount int
|
|
for b := range iter {
|
|
barCount++
|
|
go b.render(width)
|
|
}
|
|
|
|
return s.flush(cw, height, barCount, iterPop)
|
|
}
|
|
|
|
func (s *pState) flush(cw *cwriter.Writer, height, barCount int, iter <-chan *Bar) error {
|
|
var total, popCount int
|
|
rows := make([][]io.Reader, 0, barCount)
|
|
|
|
for b := range iter {
|
|
frame := <-b.frameCh
|
|
if frame.err != nil {
|
|
close(s.iterDrop)
|
|
b.cancel()
|
|
return frame.err // b.frameCh is buffered it's ok to return here
|
|
}
|
|
var discarded int
|
|
for i := len(frame.rows) - 1; i >= 0; i-- {
|
|
if total < height {
|
|
total++
|
|
} else {
|
|
_, _ = io.Copy(io.Discard, frame.rows[i]) // Found IsInBounds
|
|
discarded++
|
|
}
|
|
}
|
|
rows = append(rows, frame.rows)
|
|
|
|
switch frame.shutdown {
|
|
case 1:
|
|
b.cancel()
|
|
if qb, ok := s.queueBars[b]; ok {
|
|
delete(s.queueBars, b)
|
|
qb.priority = b.priority
|
|
s.hm.push(qb, true)
|
|
} else if s.popCompleted && !frame.noPop {
|
|
b.priority = s.popPriority
|
|
s.popPriority++
|
|
s.hm.push(b, false)
|
|
} else if !frame.rmOnComplete {
|
|
s.hm.push(b, false)
|
|
}
|
|
case 2:
|
|
if s.popCompleted && !frame.noPop {
|
|
popCount += len(frame.rows) - discarded
|
|
continue
|
|
}
|
|
fallthrough
|
|
default:
|
|
s.hm.push(b, false)
|
|
}
|
|
}
|
|
|
|
for i := len(rows) - 1; i >= 0; i-- {
|
|
for _, r := range rows[i] {
|
|
_, err := cw.ReadFrom(r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return cw.Flush(total - popCount)
|
|
}
|
|
|
|
func (s pState) makeBarState(total int64, filler BarFiller, options ...BarOption) *bState {
|
|
bs := &bState{
|
|
id: s.idCount,
|
|
priority: s.idCount,
|
|
reqWidth: s.reqWidth,
|
|
total: total,
|
|
filler: filler,
|
|
renderReq: s.renderReq,
|
|
autoRefresh: s.autoRefresh,
|
|
extender: func(_ decor.Statistics, rows ...io.Reader) ([]io.Reader, error) {
|
|
return rows, nil
|
|
},
|
|
}
|
|
|
|
if total > 0 {
|
|
bs.triggerComplete = true
|
|
}
|
|
|
|
for _, opt := range options {
|
|
if opt != nil {
|
|
opt(bs)
|
|
}
|
|
}
|
|
|
|
for _, group := range bs.decorGroups {
|
|
for _, d := range group {
|
|
if d, ok := unwrap(d).(decor.EwmaDecorator); ok {
|
|
bs.ewmaDecorators = append(bs.ewmaDecorators, d)
|
|
}
|
|
}
|
|
}
|
|
|
|
bs.buffers[0] = bytes.NewBuffer(make([]byte, 0, 128)) // prepend
|
|
bs.buffers[1] = bytes.NewBuffer(make([]byte, 0, 128)) // append
|
|
bs.buffers[2] = bytes.NewBuffer(make([]byte, 0, 256)) // filler
|
|
|
|
return bs
|
|
}
|