Drop link to moby fork

This commit is contained in:
Ettore Di Giacinto
2021-10-24 17:16:36 +02:00
parent c897bffdfc
commit 67a07e7c5a
116 changed files with 8027 additions and 3184 deletions

View File

@@ -11,6 +11,7 @@ import (
"hash"
"hash/crc32"
"io"
"runtime"
"sync"
"time"
@@ -18,9 +19,9 @@ import (
)
const (
defaultBlockSize = 256 << 10
defaultBlockSize = 1 << 20
tailSize = 16384
defaultBlocks = 16
defaultBlocks = 4
)
// These constants are copied from the flate package, so that code that imports
@@ -68,8 +69,8 @@ type result struct {
// With this you can control the approximate size of your blocks,
// as well as how many you want to be processing in parallel.
//
// Default values for this is SetConcurrency(250000, 16),
// meaning blocks are split at 250000 bytes and up to 16 blocks
// Default values for this is SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)),
// meaning blocks are split at 1 MB and up to the number of CPU threads
// can be processing at once before the writer blocks.
func (z *Writer) SetConcurrency(blockSize, blocks int) error {
if blockSize <= tailSize {
@@ -84,7 +85,7 @@ func (z *Writer) SetConcurrency(blockSize, blocks int) error {
z.blockSize = blockSize
z.results = make(chan result, blocks)
z.blocks = blocks
z.dstPool = sync.Pool{New: func() interface{} { return make([]byte, 0, blockSize+(blockSize)>>4) }}
z.dstPool.New = func() interface{} { return make([]byte, 0, blockSize+(blockSize)>>4) }
return nil
}
@@ -115,7 +116,7 @@ func NewWriterLevel(w io.Writer, level int) (*Writer, error) {
return nil, fmt.Errorf("gzip: invalid compression level: %d", level)
}
z := new(Writer)
z.SetConcurrency(defaultBlockSize, defaultBlocks)
z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
z.init(w, level)
return z, nil
}
@@ -174,7 +175,7 @@ func (z *Writer) Reset(w io.Writer) {
if z.results != nil && !z.closed {
close(z.results)
}
z.SetConcurrency(defaultBlockSize, defaultBlocks)
z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
z.init(w, z.level)
}
@@ -239,36 +240,36 @@ func (z *Writer) writeString(s string) (err error) {
// compressCurrent will compress the data currently buffered
// This should only be called from the main writer/flush/closer
func (z *Writer) compressCurrent(flush bool) {
c := z.currentBuffer
if len(c) > z.blockSize {
// This can never happen through the public interface.
panic("len(z.currentBuffer) > z.blockSize (most likely due to concurrent Write race)")
}
r := result{}
r.result = make(chan []byte, 1)
r.notifyWritten = make(chan struct{}, 0)
// Reserve a result slot
select {
case z.results <- r:
case <-z.pushedErr:
return
}
// If block given is more than twice the block size, split it.
c := z.currentBuffer
if len(c) > z.blockSize*2 {
c = c[:z.blockSize]
z.wg.Add(1)
go z.compressBlock(c, z.prevTail, r, false)
z.prevTail = c[len(c)-tailSize:]
z.currentBuffer = z.currentBuffer[z.blockSize:]
z.compressCurrent(flush)
// Last one flushes if needed
return
}
z.wg.Add(1)
go z.compressBlock(c, z.prevTail, r, z.closed)
tail := z.prevTail
if len(c) > tailSize {
z.prevTail = c[len(c)-tailSize:]
buf := z.dstPool.Get().([]byte) // Put in .compressBlock
// Copy tail from current buffer before handing the buffer over to the
// compressBlock goroutine.
buf = append(buf[:0], c[len(c)-tailSize:]...)
z.prevTail = buf
} else {
z.prevTail = nil
}
z.currentBuffer = z.dstPool.Get().([]byte)
go z.compressBlock(c, tail, r, z.closed)
z.currentBuffer = z.dstPool.Get().([]byte) // Put in .compressBlock
z.currentBuffer = z.currentBuffer[:0]
// Wait if flushing
@@ -358,29 +359,37 @@ func (z *Writer) Write(p []byte) (int, error) {
// Start receiving data from compressors
go func() {
listen := z.results
var failed bool
for {
r, ok := <-listen
// If closed, we are finished.
if !ok {
return
}
if failed {
close(r.notifyWritten)
continue
}
buf := <-r.result
n, err := z.w.Write(buf)
if err != nil {
z.pushError(err)
close(r.notifyWritten)
return
failed = true
continue
}
if n != len(buf) {
z.pushError(fmt.Errorf("gzip: short write %d should be %d", n, len(buf)))
failed = true
close(r.notifyWritten)
return
continue
}
z.dstPool.Put(buf)
close(r.notifyWritten)
}
}()
z.currentBuffer = make([]byte, 0, z.blockSize)
z.currentBuffer = z.dstPool.Get().([]byte)
z.currentBuffer = z.currentBuffer[:0]
}
q := p
for len(q) > 0 {
@@ -390,10 +399,13 @@ func (z *Writer) Write(p []byte) (int, error) {
}
z.digest.Write(q[:length])
z.currentBuffer = append(z.currentBuffer, q[:length]...)
if len(z.currentBuffer) >= z.blockSize {
if len(z.currentBuffer) > z.blockSize {
panic("z.currentBuffer too large (most likely due to concurrent Write race)")
}
if len(z.currentBuffer) == z.blockSize {
z.compressCurrent(false)
if err := z.checkError(); err != nil {
return len(p) - len(q) - length, err
return len(p) - len(q), err
}
}
z.size += length
@@ -410,12 +422,13 @@ func (z *Writer) compressBlock(p, prevTail []byte, r result, closed bool) {
close(r.result)
z.wg.Done()
}()
buf := z.dstPool.Get().([]byte)
buf := z.dstPool.Get().([]byte) // Corresponding Put in .Write's result writer
dest := bytes.NewBuffer(buf[:0])
compressor := z.dictFlatePool.Get().(*flate.Writer)
compressor := z.dictFlatePool.Get().(*flate.Writer) // Put below
compressor.ResetDict(dest, prevTail)
compressor.Write(p)
z.dstPool.Put(p) // Corresponding Get in .Write and .compressCurrent
err := compressor.Flush()
if err != nil {
@@ -429,7 +442,12 @@ func (z *Writer) compressBlock(p, prevTail []byte, r result, closed bool) {
return
}
}
z.dictFlatePool.Put(compressor)
z.dictFlatePool.Put(compressor) // Get above
if prevTail != nil {
z.dstPool.Put(prevTail) // Get in .compressCurrent
}
// Read back buffer
buf = dest.Bytes()
r.result <- buf