diff --git a/go.mod b/go.mod index d35e4915..a5a3b910 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/Sabayon/pkgs-checker v0.7.2 github.com/asdine/storm v0.0.0-20190418133842-e0f77eada154 github.com/briandowns/spinner v1.7.0 - github.com/cavaliercoder/grab v2.0.0+incompatible + github.com/cavaliercoder/grab v1.0.1-0.20201108051000-98a5bfe305ec github.com/crillab/gophersat v1.3.2-0.20201023142334-3fc2ac466765 github.com/docker/docker v17.12.0-ce-rc1.0.20200417035958-130b0bc6032c+incompatible github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect diff --git a/go.sum b/go.sum index 31f00e99..70ccfc97 100644 --- a/go.sum +++ b/go.sum @@ -97,6 +97,8 @@ github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8n github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50= github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= +github.com/cavaliercoder/grab v1.0.1-0.20201108051000-98a5bfe305ec h1:4XvMn0XuV7qxCH22gbnR79r+xTUaLOSA0GW/egpO3SQ= +github.com/cavaliercoder/grab v1.0.1-0.20201108051000-98a5bfe305ec/go.mod h1:NbXoa59CCAGqtRm7kRrcZIk2dTCJMRVF8QI3BOD7isY= github.com/cavaliercoder/grab v2.0.0+incompatible h1:wZHbBQx56+Yxjx2TCGDcenhh3cJn7cCLMfkEPmySTSE= github.com/cavaliercoder/grab v2.0.0+incompatible/go.mod h1:tTBkfNqSBfuMmMBFaO2phgyhdYhiZQ/+iXCZDzcDsMI= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= diff --git a/vendor/github.com/cavaliercoder/grab/.gitignore b/vendor/github.com/cavaliercoder/grab/.gitignore new file mode 100644 index 00000000..7ef37665 --- /dev/null +++ b/vendor/github.com/cavaliercoder/grab/.gitignore @@ -0,0 +1,3 @@ +# ignore IDE project files +*.iml +.idea/ diff --git a/vendor/github.com/cavaliercoder/grab/.travis.yml b/vendor/github.com/cavaliercoder/grab/.travis.yml index efcdb147..1672353d 100644 --- a/vendor/github.com/cavaliercoder/grab/.travis.yml +++ b/vendor/github.com/cavaliercoder/grab/.travis.yml @@ -1,6 +1,7 @@ language: go go: + - tip - 1.10.x - 1.9.x - 1.8.x diff --git a/vendor/github.com/cavaliercoder/grab/README.md b/vendor/github.com/cavaliercoder/grab/README.md index 5066789f..27b4dda3 100644 --- a/vendor/github.com/cavaliercoder/grab/README.md +++ b/vendor/github.com/cavaliercoder/grab/README.md @@ -70,7 +70,7 @@ Loop: case <-t.C: fmt.Printf(" transferred %v / %v bytes (%.2f%%)\n", resp.BytesComplete(), - resp.Size, + resp.Size(), 100*resp.Progress()) case <-resp.Done: diff --git a/vendor/github.com/cavaliercoder/grab/bps/bps.go b/vendor/github.com/cavaliercoder/grab/bps/bps.go new file mode 100644 index 00000000..0c70cb55 --- /dev/null +++ b/vendor/github.com/cavaliercoder/grab/bps/bps.go @@ -0,0 +1,54 @@ +/* +Package bps provides gauges for calculating the Bytes Per Second transfer rate +of data streams. +*/ +package bps + +import ( + "context" + "time" +) + +// Gauge is the common interface for all BPS gauges in this package. Given a +// set of samples over time, each gauge type can be used to measure the Bytes +// Per Second transfer rate of a data stream. +// +// All samples must monotonically increase in timestamp and value. Each sample +// should represent the total number of bytes sent in a stream, rather than +// accounting for the number sent since the last sample. +// +// To ensure a gauge can report progress as quickly as possible, take an initial +// sample when your stream first starts. +// +// All gauge implementations are safe for concurrent use. +type Gauge interface { + // Sample adds a new sample of the progress of the monitored stream. + Sample(t time.Time, n int64) + + // BPS returns the calculated Bytes Per Second rate of the monitored stream. + BPS() float64 +} + +// SampleFunc is used by Watch to take periodic samples of a monitored stream. +type SampleFunc func() (n int64) + +// Watch will periodically call the given SampleFunc to sample the progress of +// a monitored stream and update the given gauge. SampleFunc should return the +// total number of bytes transferred by the stream since it started. +// +// Watch is a blocking call and should typically be called in a new goroutine. +// To prevent the goroutine from leaking, make sure to cancel the given context +// once the stream is completed or canceled. +func Watch(ctx context.Context, g Gauge, f SampleFunc, interval time.Duration) { + g.Sample(time.Now(), f()) + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case now := <-t.C: + g.Sample(now, f()) + } + } +} diff --git a/vendor/github.com/cavaliercoder/grab/bps/sma.go b/vendor/github.com/cavaliercoder/grab/bps/sma.go new file mode 100644 index 00000000..295f057e --- /dev/null +++ b/vendor/github.com/cavaliercoder/grab/bps/sma.go @@ -0,0 +1,81 @@ +package bps + +import ( + "sync" + "time" +) + +// NewSMA returns a gauge that uses a Simple Moving Average with the given +// number of samples to measure the bytes per second of a byte stream. +// +// BPS is computed using the timestamp of the most recent and oldest sample in +// the sample buffer. When a new sample is added, the oldest sample is dropped +// if the sample count exceeds maxSamples. +// +// The gauge does not account for any latency in arrival time of new samples or +// the desired window size. Any variance in the arrival of samples will result +// in a BPS measurement that is correct for the submitted samples, but over a +// varying time window. +// +// maxSamples should be equal to 1 + (window size / sampling interval) where +// window size is the number of seconds over which the moving average is +// smoothed and sampling interval is the number of seconds between each sample. +// +// For example, if you want a five second window, sampling once per second, +// maxSamples should be 1 + 5/1 = 6. +func NewSMA(maxSamples int) Gauge { + if maxSamples < 2 { + panic("sample count must be greater than 1") + } + return &sma{ + maxSamples: uint64(maxSamples), + samples: make([]int64, maxSamples), + timestamps: make([]time.Time, maxSamples), + } +} + +type sma struct { + mu sync.Mutex + index uint64 + maxSamples uint64 + sampleCount uint64 + samples []int64 + timestamps []time.Time +} + +func (c *sma) Sample(t time.Time, n int64) { + c.mu.Lock() + defer c.mu.Unlock() + + c.timestamps[c.index] = t + c.samples[c.index] = n + c.index = (c.index + 1) % c.maxSamples + + // prevent integer overflow in sampleCount. Values greater or equal to + // maxSamples have the same semantic meaning. + c.sampleCount++ + if c.sampleCount > c.maxSamples { + c.sampleCount = c.maxSamples + } +} + +func (c *sma) BPS() float64 { + c.mu.Lock() + defer c.mu.Unlock() + + // we need two samples to start + if c.sampleCount < 2 { + return 0 + } + + // First sample is always the oldest until ring buffer first overflows + oldest := c.index + if c.sampleCount < c.maxSamples { + oldest = 0 + } + + newest := (c.index + c.maxSamples - 1) % c.maxSamples + seconds := c.timestamps[newest].Sub(c.timestamps[oldest]).Seconds() + bytes := float64(c.samples[newest] - c.samples[oldest]) + return bytes / seconds +} diff --git a/vendor/github.com/cavaliercoder/grab/client.go b/vendor/github.com/cavaliercoder/grab/client.go index 62f4257d..d960815e 100644 --- a/vendor/github.com/cavaliercoder/grab/client.go +++ b/vendor/github.com/cavaliercoder/grab/client.go @@ -4,20 +4,33 @@ import ( "bytes" "context" "fmt" + "io" "net/http" "os" "path/filepath" "sync" + "sync/atomic" "time" ) +// HTTPClient provides an interface allowing us to perform HTTP requests. +type HTTPClient interface { + Do(req *http.Request) (*http.Response, error) +} + +// truncater is a private interface allowing different response +// Writers to be truncated +type truncater interface { + Truncate(size int64) error +} + // A Client is a file download client. // // Clients are safe for concurrent use by multiple goroutines. type Client struct { // HTTPClient specifies the http.Client which will be used for communicating // with the remote server during the file transfer. - HTTPClient *http.Client + HTTPClient HTTPClient // UserAgent specifies the User-Agent string which will be set in the // headers of all requests made by this client. @@ -64,6 +77,7 @@ var DefaultClient = NewClient() func (c *Client) Do(req *Request) *Response { // cancel will be called on all code-paths via closeResponse ctx, cancel := context.WithCancel(req.Context()) + req = req.WithContext(ctx) resp := &Response{ Request: req, Start: time.Now(), @@ -189,7 +203,7 @@ func (c *Client) run(resp *Response, f stateFunc) { // // If an error occurs, the next stateFunc is closeResponse. func (c *Client) statFileInfo(resp *Response) stateFunc { - if resp.Filename == "" { + if resp.Request.NoStore || resp.Filename == "" { return c.headRequest } fi, err := os.Stat(resp.Filename) @@ -225,31 +239,39 @@ func (c *Client) validateLocal(resp *Response) stateFunc { return c.closeResponse } - // determine expected file size - size := resp.Request.Size - if size == 0 && resp.HTTPResponse != nil { - size = resp.HTTPResponse.ContentLength + // determine target file size + expectedSize := resp.Request.Size + if expectedSize == 0 && resp.HTTPResponse != nil { + expectedSize = resp.HTTPResponse.ContentLength } - if size == 0 { + + if expectedSize == 0 { + // size is either actually 0 or unknown + // if unknown, we ask the remote server + // if known to be 0, we proceed with a GET return c.headRequest } - if size == resp.fi.Size() { + if expectedSize == resp.fi.Size() { + // local file matches remote file size - wrap it up resp.DidResume = true resp.bytesResumed = resp.fi.Size() return c.checksumFile } if resp.Request.NoResume { + // local file should be overwritten return c.getRequest } - if size < resp.fi.Size() { + if expectedSize >= 0 && expectedSize < resp.fi.Size() { + // remote size is known, is smaller than local size and we want to resume resp.err = ErrBadLength return c.closeResponse } if resp.CanResume { + // set resume range on GET request resp.Request.HTTPRequest.Header.Set( "Range", fmt.Sprintf("bytes=%d-", resp.fi.Size())) @@ -265,19 +287,24 @@ func (c *Client) checksumFile(resp *Response) stateFunc { return c.closeResponse } if resp.Filename == "" { - panic("filename not set") + panic("grab: developer error: filename not set") + } + if resp.Size() < 0 { + panic("grab: developer error: size unknown") } req := resp.Request - // compare checksum + // compute checksum var sum []byte - sum, resp.err = checksum(req.Context(), resp.Filename, req.hash) + sum, resp.err = resp.checksumUnsafe() if resp.err != nil { return c.closeResponse } + + // compare checksum if !bytes.Equal(sum, req.checksum) { resp.err = ErrBadChecksum - if req.deleteOnError { + if !resp.Request.NoStore && req.deleteOnError { if err := os.Remove(resp.Filename); err != nil { // err should be os.PathError and include file path resp.err = fmt.Errorf( @@ -326,6 +353,14 @@ func (c *Client) headRequest(resp *Response) stateFunc { return c.getRequest } + // In case of redirects during HEAD, record the final URL and use it + // instead of the original URL when sending future requests. + // This way we avoid sending potentially unsupported requests to + // the original URL, e.g. "Range", since it was the final URL + // that advertised its support. + resp.Request.HTTPRequest.URL = resp.HTTPResponse.Request.URL + resp.Request.HTTPRequest.Host = resp.HTTPResponse.Request.Host + return c.readResponse } @@ -335,6 +370,8 @@ func (c *Client) getRequest(resp *Response) stateFunc { return c.closeResponse } + // TODO: check Content-Range + // check status code if !resp.Request.IgnoreBadStatusCodes { if resp.HTTPResponse.StatusCode < 200 || resp.HTTPResponse.StatusCode > 299 { @@ -348,13 +385,15 @@ func (c *Client) getRequest(resp *Response) stateFunc { func (c *Client) readResponse(resp *Response) stateFunc { if resp.HTTPResponse == nil { - panic("Response.HTTPResponse is not ready") + panic("grab: developer error: Response.HTTPResponse is nil") } // check expected size - resp.Size = resp.bytesResumed + resp.HTTPResponse.ContentLength - if resp.HTTPResponse.ContentLength > 0 && resp.Request.Size > 0 { - if resp.Request.Size != resp.Size { + resp.sizeUnsafe = resp.HTTPResponse.ContentLength + if resp.sizeUnsafe >= 0 { + // remote size is known + resp.sizeUnsafe += resp.bytesResumed + if resp.Request.Size > 0 && resp.Request.Size != resp.sizeUnsafe { resp.err = ErrBadLength return c.closeResponse } @@ -371,7 +410,7 @@ func (c *Client) readResponse(resp *Response) stateFunc { resp.Filename = filepath.Join(resp.Request.Filename, filename) } - if resp.requestMethod() == "HEAD" { + if !resp.Request.NoStore && resp.requestMethod() == "HEAD" { if resp.HTTPResponse.Header.Get("Accept-Ranges") == "bytes" { resp.CanResume = true } @@ -385,39 +424,45 @@ func (c *Client) readResponse(resp *Response) stateFunc { // // Requires that Response.Filename and resp.DidResume are already be set. func (c *Client) openWriter(resp *Response) stateFunc { - if !resp.Request.NoCreateDirectories { + if !resp.Request.NoStore && !resp.Request.NoCreateDirectories { resp.err = mkdirp(resp.Filename) if resp.err != nil { return c.closeResponse } } - // compute write flags - flag := os.O_CREATE | os.O_WRONLY - if resp.fi != nil { - if resp.DidResume { - flag = os.O_APPEND | os.O_WRONLY - } else { - flag = os.O_TRUNC | os.O_WRONLY + if resp.Request.NoStore { + resp.writer = &resp.storeBuffer + } else { + // compute write flags + flag := os.O_CREATE | os.O_WRONLY + if resp.fi != nil { + if resp.DidResume { + flag = os.O_APPEND | os.O_WRONLY + } else { + // truncate later in copyFile, if not cancelled + // by BeforeCopy hook + flag = os.O_WRONLY + } } - } - // open file - f, err := os.OpenFile(resp.Filename, flag, 0644) - if err != nil { - resp.err = err - return c.closeResponse - } - resp.writer = f + // open file + f, err := os.OpenFile(resp.Filename, flag, 0666) + if err != nil { + resp.err = err + return c.closeResponse + } + resp.writer = f - // seek to start or end - whence := os.SEEK_SET - if resp.bytesResumed > 0 { - whence = os.SEEK_END - } - _, resp.err = f.Seek(0, whence) - if resp.err != nil { - return c.closeResponse + // seek to start or end + whence := os.SEEK_SET + if resp.bytesResumed > 0 { + whence = os.SEEK_END + } + _, resp.err = f.Seek(0, whence) + if resp.err != nil { + return c.closeResponse + } } // init transfer @@ -450,24 +495,42 @@ func (c *Client) copyFile(resp *Response) stateFunc { } } + var bytesCopied int64 if resp.transfer == nil { - panic("developer error: Response.transfer is not initialized") + panic("grab: developer error: Response.transfer is nil") } - go resp.watchBps() - _, resp.err = resp.transfer.copy() + + // We waited to truncate the file in openWriter() to make sure + // the BeforeCopy didn't cancel the copy. If this was an existing + // file that is not going to be resumed, truncate the contents. + if t, ok := resp.writer.(truncater); ok && resp.fi != nil && !resp.DidResume { + t.Truncate(0) + } + + bytesCopied, resp.err = resp.transfer.copy() if resp.err != nil { return c.closeResponse } closeWriter(resp) - // set timestamp - if !resp.Request.IgnoreRemoteTime { + // set file timestamp + if !resp.Request.NoStore && !resp.Request.IgnoreRemoteTime { resp.err = setLastModified(resp.HTTPResponse, resp.Filename) if resp.err != nil { return c.closeResponse } } + // update transfer size if previously unknown + if resp.Size() < 0 { + discoveredSize := resp.bytesResumed + bytesCopied + atomic.StoreInt64(&resp.sizeUnsafe, discoveredSize) + if resp.Request.Size > 0 && resp.Request.Size != discoveredSize { + resp.err = ErrBadLength + return c.closeResponse + } + } + // run AfterCopy hook if f := resp.Request.AfterCopy; f != nil { resp.err = f(resp) @@ -480,16 +543,16 @@ func (c *Client) copyFile(resp *Response) stateFunc { } func closeWriter(resp *Response) { - if resp.writer != nil { - resp.writer.Close() - resp.writer = nil + if closer, ok := resp.writer.(io.Closer); ok { + closer.Close() } + resp.writer = nil } // close finalizes the Response func (c *Client) closeResponse(resp *Response) stateFunc { if resp.IsComplete() { - panic("response already closed") + panic("grab: developer error: response already closed") } resp.fi = nil diff --git a/vendor/github.com/cavaliercoder/grab/go.mod b/vendor/github.com/cavaliercoder/grab/go.mod new file mode 100644 index 00000000..3a423a77 --- /dev/null +++ b/vendor/github.com/cavaliercoder/grab/go.mod @@ -0,0 +1,3 @@ +module github.com/cavaliercoder/grab + +go 1.14 diff --git a/vendor/github.com/cavaliercoder/grab/request.go b/vendor/github.com/cavaliercoder/grab/request.go index 50745510..f86cfc3a 100644 --- a/vendor/github.com/cavaliercoder/grab/request.go +++ b/vendor/github.com/cavaliercoder/grab/request.go @@ -51,6 +51,11 @@ type Request struct { // completed in full, it will not be restarted. NoResume bool + // NoStore specifies that grab should not write to the local file system. + // Instead, the download will be stored in memory and accessible only via + // Response.Open or Response.Bytes. + NoStore bool + // NoCreateDirectories specifies that any missing directories in the given // Filename path should not be created automatically, if they do not already // exist. diff --git a/vendor/github.com/cavaliercoder/grab/response.go b/vendor/github.com/cavaliercoder/grab/response.go index 9a30ee3b..05bbca13 100644 --- a/vendor/github.com/cavaliercoder/grab/response.go +++ b/vendor/github.com/cavaliercoder/grab/response.go @@ -1,11 +1,13 @@ package grab import ( + "bytes" "context" "io" + "io/ioutil" "net/http" "os" - "sync" + "sync/atomic" "time" ) @@ -31,7 +33,7 @@ type Response struct { Filename string // Size specifies the total expected size of the file transfer. - Size int64 + sizeUnsafe int64 // Start specifies the time at which the file transfer started. Start time.Time @@ -70,7 +72,11 @@ type Response struct { // writer is the file handle used to write the downloaded file to local // storage - writer io.WriteCloser + writer io.Writer + + // storeBuffer receives the contents of the transfer if Request.NoStore is + // enabled. + storeBuffer bytes.Buffer // bytesCompleted specifies the number of bytes which were already // transferred before this transfer began. @@ -80,11 +86,6 @@ type Response struct { // file, tracking progress and allowing for cancelation. transfer *transfer - // bytesPerSecond specifies the number of bytes that have been transferred in - // the last 1-second window. - bytesPerSecond float64 - bytesPerSecondMu sync.Mutex - // bufferSize specifies the size in bytes of the transfer buffer. bufferSize int @@ -125,6 +126,13 @@ func (c *Response) Err() error { return c.err } +// Size returns the size of the file transfer. If the remote server does not +// specify the total size and the transfer is incomplete, the return value is +// -1. +func (c *Response) Size() int64 { + return atomic.LoadInt64(&c.sizeUnsafe) +} + // BytesComplete returns the total number of bytes which have been copied to // the destination, including any bytes that were resumed from a previous // download. @@ -132,25 +140,24 @@ func (c *Response) BytesComplete() int64 { return c.bytesResumed + c.transfer.N() } -// BytesPerSecond returns the number of bytes transferred in the last second. If -// the download is already complete, the average bytes/sec for the life of the -// download is returned. +// BytesPerSecond returns the number of bytes per second transferred using a +// simple moving average of the last five seconds. If the download is already +// complete, the average bytes/sec for the life of the download is returned. func (c *Response) BytesPerSecond() float64 { if c.IsComplete() { return float64(c.transfer.N()) / c.Duration().Seconds() } - c.bytesPerSecondMu.Lock() - defer c.bytesPerSecondMu.Unlock() - return c.bytesPerSecond + return c.transfer.BPS() } // Progress returns the ratio of total bytes that have been downloaded. Multiply // the returned value by 100 to return the percentage completed. func (c *Response) Progress() float64 { - if c.Size == 0 { + size := c.Size() + if size <= 0 { return 0 } - return float64(c.BytesComplete()) / float64(c.Size) + return float64(c.BytesComplete()) / float64(size) } // Duration returns the duration of a file transfer. If the transfer is in @@ -173,40 +180,53 @@ func (c *Response) ETA() time.Time { return c.End } bt := c.BytesComplete() - bps := c.BytesPerSecond() + bps := c.transfer.BPS() if bps == 0 { return time.Time{} } - secs := float64(c.Size-bt) / bps + secs := float64(c.Size()-bt) / bps return time.Now().Add(time.Duration(secs) * time.Second) } -// watchBps watches the progress of a transfer and maintains statistics. -func (c *Response) watchBps() { - var prev int64 - then := c.Start - - t := time.NewTicker(time.Second) - defer t.Stop() - - for { - select { - case <-c.Done: - return - - case now := <-t.C: - d := now.Sub(then) - then = now - - cur := c.transfer.N() - bs := cur - prev - prev = cur - - c.bytesPerSecondMu.Lock() - c.bytesPerSecond = float64(bs) / d.Seconds() - c.bytesPerSecondMu.Unlock() - } +// Open blocks the calling goroutine until the underlying file transfer is +// completed and then opens the transferred file for reading. If Request.NoStore +// was enabled, the reader will read from memory. +// +// If an error occurred during the transfer, it will be returned. +// +// It is the callers responsibility to close the returned file handle. +func (c *Response) Open() (io.ReadCloser, error) { + if err := c.Err(); err != nil { + return nil, err } + return c.openUnsafe() +} + +func (c *Response) openUnsafe() (io.ReadCloser, error) { + if c.Request.NoStore { + return ioutil.NopCloser(bytes.NewReader(c.storeBuffer.Bytes())), nil + } + return os.Open(c.Filename) +} + +// Bytes blocks the calling goroutine until the underlying file transfer is +// completed and then reads all bytes from the completed tranafer. If +// Request.NoStore was enabled, the bytes will be read from memory. +// +// If an error occurred during the transfer, it will be returned. +func (c *Response) Bytes() ([]byte, error) { + if err := c.Err(); err != nil { + return nil, err + } + if c.Request.NoStore { + return c.storeBuffer.Bytes(), nil + } + f, err := c.Open() + if err != nil { + return nil, err + } + defer f.Close() + return ioutil.ReadAll(f) } func (c *Response) requestMethod() string { @@ -216,6 +236,20 @@ func (c *Response) requestMethod() string { return c.HTTPResponse.Request.Method } +func (c *Response) checksumUnsafe() ([]byte, error) { + f, err := c.openUnsafe() + if err != nil { + return nil, err + } + defer f.Close() + t := newTransfer(c.Request.Context(), nil, c.Request.hash, f, nil) + if _, err = t.copy(); err != nil { + return nil, err + } + sum := c.Request.hash.Sum(nil) + return sum, nil +} + func (c *Response) closeResponseBody() error { if c.HTTPResponse == nil || c.HTTPResponse.Body == nil { return nil diff --git a/vendor/github.com/cavaliercoder/grab/transfer.go b/vendor/github.com/cavaliercoder/grab/transfer.go index 6fe20556..2bd2db3f 100644 --- a/vendor/github.com/cavaliercoder/grab/transfer.go +++ b/vendor/github.com/cavaliercoder/grab/transfer.go @@ -4,30 +4,42 @@ import ( "context" "io" "sync/atomic" + "time" + + "github.com/cavaliercoder/grab/bps" ) type transfer struct { - n int64 // must be 64bit aligned on 386 - ctx context.Context - lim RateLimiter - w io.Writer - r io.Reader - b []byte + n int64 // must be 64bit aligned on 386 + ctx context.Context + gauge bps.Gauge + lim RateLimiter + w io.Writer + r io.Reader + b []byte } func newTransfer(ctx context.Context, lim RateLimiter, dst io.Writer, src io.Reader, buf []byte) *transfer { return &transfer{ - ctx: ctx, - lim: lim, - w: dst, - r: src, - b: buf, + ctx: ctx, + gauge: bps.NewSMA(6), // five second moving average sampling every second + lim: lim, + w: dst, + r: src, + b: buf, } } // copy behaves similarly to io.CopyBuffer except that it checks for cancelation -// of the given context.Context and reports progress in a thread-safe manner. +// of the given context.Context, reports progress in a thread-safe manner and +// tracks the transfer rate. func (c *transfer) copy() (written int64, err error) { + // maintain a bps gauge in another goroutine + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + go bps.Watch(ctx, c.gauge, c.N, time.Second) + + // start the transfer if c.b == nil { c.b = make([]byte, 32*1024) } @@ -39,12 +51,6 @@ func (c *transfer) copy() (written int64, err error) { default: // keep working } - if c.lim != nil { - err = c.lim.WaitN(c.ctx, len(c.b)) - if err != nil { - return - } - } nr, er := c.r.Read(c.b) if nr > 0 { nw, ew := c.w.Write(c.b[0:nr]) @@ -60,6 +66,13 @@ func (c *transfer) copy() (written int64, err error) { err = io.ErrShortWrite break } + // wait for rate limiter + if c.lim != nil { + err = c.lim.WaitN(c.ctx, nr) + if err != nil { + return + } + } } if er != nil { if er != io.EOF { @@ -79,3 +92,12 @@ func (c *transfer) N() (n int64) { n = atomic.LoadInt64(&c.n) return } + +// BPS returns the current bytes per second transfer rate using a simple moving +// average. +func (c *transfer) BPS() (bps float64) { + if c == nil || c.gauge == nil { + return 0 + } + return c.gauge.BPS() +} diff --git a/vendor/github.com/cavaliercoder/grab/util.go b/vendor/github.com/cavaliercoder/grab/util.go index 1890fe3a..2491723f 100644 --- a/vendor/github.com/cavaliercoder/grab/util.go +++ b/vendor/github.com/cavaliercoder/grab/util.go @@ -1,9 +1,7 @@ package grab import ( - "context" "fmt" - "hash" "mime" "net/http" "os" @@ -36,22 +34,26 @@ func mkdirp(path string) error { if !os.IsNotExist(err) { return fmt.Errorf("error checking destination directory: %v", err) } - if err := os.MkdirAll(dir, 0755); err != nil { + if err := os.MkdirAll(dir, 0777); err != nil { return fmt.Errorf("error creating destination directory: %v", err) } } else if !fi.IsDir() { - panic("destination path is not directory") + panic("grab: developer error: destination path is not directory") } return nil } // guessFilename returns a filename for the given http.Response. If none can be // determined ErrNoFilename is returned. +// +// TODO: NoStore operations should not require a filename func guessFilename(resp *http.Response) (string, error) { filename := resp.Request.URL.Path if cd := resp.Header.Get("Content-Disposition"); cd != "" { if _, params, err := mime.ParseMediaType(cd); err == nil { - filename = params["filename"] + if val, ok := params["filename"]; ok { + filename = val + } // else filename directive is missing.. fallback to URL.Path } } @@ -67,23 +69,3 @@ func guessFilename(resp *http.Response) (string, error) { return filename, nil } - -// checksum returns a hash of the given file, using the given hash algorithm. -func checksum(ctx context.Context, filename string, h hash.Hash) (b []byte, err error) { - var f *os.File - f, err = os.Open(filename) - if err != nil { - return - } - defer func() { - err = f.Close() - }() - - t := newTransfer(ctx, nil, h, f, nil) - if _, err = t.copy(); err != nil { - return - } - - b = h.Sum(nil) - return -} diff --git a/vendor/modules.txt b/vendor/modules.txt index dfe9145a..ce9b76f3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -44,8 +44,9 @@ github.com/asdine/storm/internal github.com/asdine/storm/q # github.com/briandowns/spinner v1.7.0 github.com/briandowns/spinner -# github.com/cavaliercoder/grab v2.0.0+incompatible +# github.com/cavaliercoder/grab v1.0.1-0.20201108051000-98a5bfe305ec github.com/cavaliercoder/grab +github.com/cavaliercoder/grab/bps # github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9 github.com/chuckpreslar/emission # github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0