bump github.com/moby/buildkit to v0.13.0 (#351)

* bump github.com/moby/buildkit to v0.13.0

Signed-off-by: Nianyu Shen <nianyu@spectrocloud.com>

* fix: update dep usage based on newer version

Signed-off-by: Nianyu Shen <nianyu@spectrocloud.com>

* remove empty line

Signed-off-by: Nianyu Shen <nianyu@spectrocloud.com>

* ci: bump golang to 1.21.x

* Bump moby

* debug

---------

Signed-off-by: Nianyu Shen <nianyu@spectrocloud.com>
Co-authored-by: Nianyu Shen <nianyu@spectrocloud.com>
This commit is contained in:
Ettore Di Giacinto
2024-03-15 09:26:32 +01:00
committed by GitHub
parent c47bf4833a
commit 4c788ccbd1
1779 changed files with 127547 additions and 71408 deletions

View File

@@ -5,7 +5,6 @@
package zstd
import (
"bytes"
"context"
"encoding/binary"
"io"
@@ -35,13 +34,13 @@ type Decoder struct {
br readerWrapper
enabled bool
inFrame bool
dstBuf []byte
}
frame *frameDec
// Custom dictionaries.
// Always uses copies.
dicts map[uint32]dict
dicts map[uint32]*dict
// streamWg is the waitgroup for all streams
streamWg sync.WaitGroup
@@ -103,7 +102,7 @@ func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
}
// Transfer option dicts.
d.dicts = make(map[uint32]dict, len(d.o.dicts))
d.dicts = make(map[uint32]*dict, len(d.o.dicts))
for _, dc := range d.o.dicts {
d.dicts[dc.id] = dc
}
@@ -187,21 +186,23 @@ func (d *Decoder) Reset(r io.Reader) error {
}
// If bytes buffer and < 5MB, do sync decoding anyway.
if bb, ok := r.(byter); ok && bb.Len() < 5<<20 {
if bb, ok := r.(byter); ok && bb.Len() < d.o.decodeBufsBelow && !d.o.limitToCap {
bb2 := bb
if debugDecoder {
println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
}
b := bb2.Bytes()
var dst []byte
if cap(d.current.b) > 0 {
dst = d.current.b
if cap(d.syncStream.dstBuf) > 0 {
dst = d.syncStream.dstBuf[:0]
}
dst, err := d.DecodeAll(b, dst[:0])
dst, err := d.DecodeAll(b, dst)
if err == nil {
err = io.EOF
}
// Save output buffer
d.syncStream.dstBuf = dst
d.current.b = dst
d.current.err = err
d.current.flushed = true
@@ -216,6 +217,7 @@ func (d *Decoder) Reset(r io.Reader) error {
d.current.err = nil
d.current.flushed = false
d.current.d = nil
d.syncStream.dstBuf = nil
// Ensure no-one else is still running...
d.streamWg.Wait()
@@ -312,6 +314,7 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
// Grab a block decoder and frame decoder.
block := <-d.decoders
frame := block.localFrame
initialSize := len(dst)
defer func() {
if debugDecoder {
printf("re-adding decoder: %p", block)
@@ -337,29 +340,36 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
}
return dst, err
}
if frame.DictionaryID != nil {
dict, ok := d.dicts[*frame.DictionaryID]
if !ok {
return nil, ErrUnknownDictionary
}
if err = d.setDict(frame); err != nil {
return nil, err
}
if frame.WindowSize > d.o.maxWindowSize {
if debugDecoder {
println("setting dict", frame.DictionaryID)
println("window size exceeded:", frame.WindowSize, ">", d.o.maxWindowSize)
}
frame.history.setDict(&dict)
return dst, ErrWindowSizeExceeded
}
if frame.FrameContentSize != fcsUnknown && frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) {
return dst, ErrDecoderSizeExceeded
}
if frame.FrameContentSize < 1<<30 {
// Never preallocate more than 1 GB up front.
if frame.FrameContentSize != fcsUnknown {
if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)-initialSize) {
if debugDecoder {
println("decoder size exceeded; fcs:", frame.FrameContentSize, "> mcs:", d.o.maxDecodedSize-uint64(len(dst)-initialSize), "len:", len(dst))
}
return dst, ErrDecoderSizeExceeded
}
if d.o.limitToCap && frame.FrameContentSize > uint64(cap(dst)-len(dst)) {
if debugDecoder {
println("decoder size exceeded; fcs:", frame.FrameContentSize, "> (cap-len)", cap(dst)-len(dst))
}
return dst, ErrDecoderSizeExceeded
}
if cap(dst)-len(dst) < int(frame.FrameContentSize) {
dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize))
dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize)+compressedBlockOverAlloc)
copy(dst2, dst)
dst = dst2
}
}
if cap(dst) == 0 {
if cap(dst) == 0 && !d.o.limitToCap {
// Allocate len(input) * 2 by default if nothing is provided
// and we didn't get frame content size.
size := len(input) * 2
@@ -377,6 +387,9 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
if err != nil {
return dst, err
}
if uint64(len(dst)-initialSize) > d.o.maxDecodedSize {
return dst, ErrDecoderSizeExceeded
}
if len(frame.bBuf) == 0 {
if debugDecoder {
println("frame dbuf empty")
@@ -437,26 +450,23 @@ func (d *Decoder) nextBlock(blocking bool) (ok bool) {
println("got", len(d.current.b), "bytes, error:", d.current.err, "data crc:", tmp)
}
if len(next.b) > 0 {
n, err := d.current.crc.Write(next.b)
if err == nil {
if n != len(next.b) {
d.current.err = io.ErrShortWrite
}
}
if d.o.ignoreChecksum {
return true
}
if next.err == nil && next.d != nil && len(next.d.checkCRC) != 0 {
got := d.current.crc.Sum64()
var tmp [4]byte
binary.LittleEndian.PutUint32(tmp[:], uint32(got))
if !bytes.Equal(tmp[:], next.d.checkCRC) && !ignoreCRC {
if len(next.b) > 0 {
d.current.crc.Write(next.b)
}
if next.err == nil && next.d != nil && next.d.hasCRC {
got := uint32(d.current.crc.Sum64())
if got != next.d.checkCRC {
if debugDecoder {
println("CRC Check Failed:", tmp[:], " (got) !=", next.d.checkCRC, "(on stream)")
printf("CRC Check Failed: %08x (got) != %08x (on stream)\n", got, next.d.checkCRC)
}
d.current.err = ErrCRCMismatch
} else {
if debugDecoder {
println("CRC ok", tmp[:])
printf("CRC ok %08x\n", got)
}
}
}
@@ -472,18 +482,12 @@ func (d *Decoder) nextBlockSync() (ok bool) {
if !d.syncStream.inFrame {
d.frame.history.reset()
d.current.err = d.frame.reset(&d.syncStream.br)
if d.current.err == nil {
d.current.err = d.setDict(d.frame)
}
if d.current.err != nil {
return false
}
if d.frame.DictionaryID != nil {
dict, ok := d.dicts[*d.frame.DictionaryID]
if !ok {
d.current.err = ErrUnknownDictionary
return false
} else {
d.frame.history.setDict(&dict)
}
}
if d.frame.WindowSize > d.o.maxDecodedSize || d.frame.WindowSize > d.o.maxWindowSize {
d.current.err = ErrDecoderSizeExceeded
return false
@@ -533,9 +537,15 @@ func (d *Decoder) nextBlockSync() (ok bool) {
// Update/Check CRC
if d.frame.HasCheckSum {
d.frame.crc.Write(d.current.b)
if !d.o.ignoreChecksum {
d.frame.crc.Write(d.current.b)
}
if d.current.d.Last {
d.current.err = d.frame.checkCRC()
if !d.o.ignoreChecksum {
d.current.err = d.frame.checkCRC()
} else {
d.current.err = d.frame.consumeCRC()
}
if d.current.err != nil {
println("CRC error:", d.current.err)
return false
@@ -629,60 +639,18 @@ func (d *Decoder) startSyncDecoder(r io.Reader) error {
// Create Decoder:
// ASYNC:
// Spawn 4 go routines.
// 0: Read frames and decode blocks.
// 1: Decode block and literals. Receives hufftree and seqdecs, returns seqdecs and huff tree.
// 2: Wait for recentOffsets if needed. Decode sequences, send recentOffsets.
// 3: Wait for stream history, execute sequences, send stream history.
// Spawn 3 go routines.
// 0: Read frames and decode block literals.
// 1: Decode sequences.
// 2: Execute sequences, send to output.
func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) {
defer d.streamWg.Done()
br := readerWrapper{r: r}
var seqPrepare = make(chan *blockDec, d.o.concurrent)
var seqDecode = make(chan *blockDec, d.o.concurrent)
var seqExecute = make(chan *blockDec, d.o.concurrent)
// Async 1: Prepare blocks...
go func() {
var hist history
var hasErr bool
for block := range seqPrepare {
if hasErr {
if block != nil {
seqDecode <- block
}
continue
}
if block.async.newHist != nil {
if debugDecoder {
println("Async 1: new history")
}
hist.reset()
if block.async.newHist.dict != nil {
hist.setDict(block.async.newHist.dict)
}
}
if block.err != nil || block.Type != blockTypeCompressed {
hasErr = block.err != nil
seqDecode <- block
continue
}
remain, err := block.decodeLiterals(block.data, &hist)
block.err = err
hasErr = block.err != nil
if err == nil {
block.async.literals = hist.decoders.literals
block.async.seqData = remain
} else if debugDecoder {
println("decodeLiterals error:", err)
}
seqDecode <- block
}
close(seqDecode)
}()
// Async 2: Decode sequences...
// Async 1: Decode sequences...
go func() {
var hist history
var hasErr bool
@@ -696,8 +664,9 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
}
if block.async.newHist != nil {
if debugDecoder {
println("Async 2: new history, recent:", block.async.newHist.recentOffsets)
println("Async 1: new history, recent:", block.async.newHist.recentOffsets)
}
hist.reset()
hist.decoders = block.async.newHist.decoders
hist.recentOffsets = block.async.newHist.recentOffsets
hist.windowSize = block.async.newHist.windowSize
@@ -729,6 +698,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
seqExecute <- block
}
close(seqExecute)
hist.reset()
}()
var wg sync.WaitGroup
@@ -750,8 +720,9 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
}
if block.async.newHist != nil {
if debugDecoder {
println("Async 3: new history")
println("Async 2: new history")
}
hist.reset()
hist.windowSize = block.async.newHist.windowSize
hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer
if block.async.newHist.dict != nil {
@@ -781,7 +752,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
if block.lowMem {
block.dst = make([]byte, block.RLESize)
} else {
block.dst = make([]byte, maxBlockSize)
block.dst = make([]byte, maxCompressedBlockSize)
}
}
block.dst = block.dst[:block.RLESize]
@@ -833,10 +804,38 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
if debugDecoder {
println("decoder goroutines finished")
}
hist.reset()
}()
var hist history
decodeStream:
for {
var hasErr bool
hist.reset()
decodeBlock := func(block *blockDec) {
if hasErr {
if block != nil {
seqDecode <- block
}
return
}
if block.err != nil || block.Type != blockTypeCompressed {
hasErr = block.err != nil
seqDecode <- block
return
}
remain, err := block.decodeLiterals(block.data, &hist)
block.err = err
hasErr = block.err != nil
if err == nil {
block.async.literals = hist.decoders.literals
block.async.seqData = remain
} else if debugDecoder {
println("decodeLiterals error:", err)
}
seqDecode <- block
}
frame := d.frame
if debugDecoder {
println("New frame...")
@@ -847,15 +846,14 @@ decodeStream:
if debugDecoder && err != nil {
println("Frame decoder returned", err)
}
if err == nil && frame.DictionaryID != nil {
dict, ok := d.dicts[*frame.DictionaryID]
if !ok {
err = ErrUnknownDictionary
} else {
frame.history.setDict(&dict)
}
if err == nil {
err = d.setDict(frame)
}
if err == nil && d.frame.WindowSize > d.o.maxWindowSize {
if debugDecoder {
println("decoder size exceeded, fws:", d.frame.WindowSize, "> mws:", d.o.maxWindowSize)
}
err = ErrDecoderSizeExceeded
}
if err != nil {
@@ -863,7 +861,7 @@ decodeStream:
case <-ctx.Done():
case dec := <-d.decoders:
dec.sendErr(err)
seqPrepare <- dec
decodeBlock(dec)
}
break decodeStream
}
@@ -883,6 +881,10 @@ decodeStream:
if debugDecoder {
println("Alloc History:", h.allocFrameBuffer)
}
hist.reset()
if h.dict != nil {
hist.setDict(h.dict)
}
dec.async.newHist = &h
dec.async.fcs = frame.FrameContentSize
historySent = true
@@ -893,23 +895,27 @@ decodeStream:
println("next block returned error:", err)
}
dec.err = err
dec.checkCRC = nil
dec.hasCRC = false
if dec.Last && frame.HasCheckSum && err == nil {
crc, err := frame.rawInput.readSmall(4)
if err != nil {
if len(crc) < 4 {
if err == nil {
err = io.ErrUnexpectedEOF
}
println("CRC missing?", err)
dec.err = err
}
var tmp [4]byte
copy(tmp[:], crc)
dec.checkCRC = tmp[:]
if debugDecoder {
println("found crc to check:", dec.checkCRC)
} else {
dec.checkCRC = binary.LittleEndian.Uint32(crc)
dec.hasCRC = true
if debugDecoder {
printf("found crc to check: %08x\n", dec.checkCRC)
}
}
}
err = dec.err
last := dec.Last
seqPrepare <- dec
decodeBlock(dec)
if err != nil {
break decodeStream
}
@@ -918,7 +924,25 @@ decodeStream:
}
}
}
close(seqPrepare)
close(seqDecode)
wg.Wait()
hist.reset()
d.frame.history.b = frameHistCache
}
func (d *Decoder) setDict(frame *frameDec) (err error) {
dict, ok := d.dicts[frame.DictionaryID]
if ok {
if debugDecoder {
println("setting dict", frame.DictionaryID)
}
frame.history.setDict(dict)
} else if frame.DictionaryID != 0 {
// A zero or missing dictionary id is ambiguous:
// either dictionary zero, or no dictionary. In particular,
// zstd --patch-from uses this id for the source file,
// so only return an error if the dictionary id is not zero.
err = ErrUnknownDictionary
}
return err
}