mirror of
https://github.com/mudler/luet.git
synced 2025-09-09 19:19:49 +00:00
Support priv/unpriv image extraction
Optionally add back privileged extraction which can be enabled with LUET_PRIVILEGED_EXTRACT=true Signed-off-by: Ettore Di Giacinto <mudler@sabayon.org>
This commit is contained in:
410
vendor/github.com/moby/buildkit/solver/scheduler.go
generated
vendored
Normal file
410
vendor/github.com/moby/buildkit/solver/scheduler.go
generated
vendored
Normal file
@@ -0,0 +1,410 @@
|
||||
package solver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/moby/buildkit/solver/internal/pipe"
|
||||
"github.com/moby/buildkit/util/cond"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var debugScheduler = false // TODO: replace with logs in build trace
|
||||
|
||||
func init() {
|
||||
if os.Getenv("BUILDKIT_SCHEDULER_DEBUG") == "1" {
|
||||
debugScheduler = true
|
||||
}
|
||||
}
|
||||
|
||||
func newScheduler(ef edgeFactory) *scheduler {
|
||||
s := &scheduler{
|
||||
waitq: map[*edge]struct{}{},
|
||||
incoming: map[*edge][]*edgePipe{},
|
||||
outgoing: map[*edge][]*edgePipe{},
|
||||
|
||||
stopped: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
|
||||
ef: ef,
|
||||
}
|
||||
s.cond = cond.NewStatefulCond(&s.mu)
|
||||
|
||||
go s.loop()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
type dispatcher struct {
|
||||
next *dispatcher
|
||||
e *edge
|
||||
}
|
||||
|
||||
type scheduler struct {
|
||||
cond *cond.StatefulCond
|
||||
mu sync.Mutex
|
||||
muQ sync.Mutex
|
||||
|
||||
ef edgeFactory
|
||||
|
||||
waitq map[*edge]struct{}
|
||||
next *dispatcher
|
||||
last *dispatcher
|
||||
stopped chan struct{}
|
||||
stoppedOnce sync.Once
|
||||
closed chan struct{}
|
||||
|
||||
incoming map[*edge][]*edgePipe
|
||||
outgoing map[*edge][]*edgePipe
|
||||
}
|
||||
|
||||
func (s *scheduler) Stop() {
|
||||
s.stoppedOnce.Do(func() {
|
||||
close(s.stopped)
|
||||
})
|
||||
<-s.closed
|
||||
}
|
||||
|
||||
func (s *scheduler) loop() {
|
||||
defer func() {
|
||||
close(s.closed)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
<-s.stopped
|
||||
s.mu.Lock()
|
||||
s.cond.Signal()
|
||||
s.mu.Unlock()
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
for {
|
||||
select {
|
||||
case <-s.stopped:
|
||||
s.mu.Unlock()
|
||||
return
|
||||
default:
|
||||
}
|
||||
s.muQ.Lock()
|
||||
l := s.next
|
||||
if l != nil {
|
||||
if l == s.last {
|
||||
s.last = nil
|
||||
}
|
||||
s.next = l.next
|
||||
delete(s.waitq, l.e)
|
||||
}
|
||||
s.muQ.Unlock()
|
||||
if l == nil {
|
||||
s.cond.Wait()
|
||||
continue
|
||||
}
|
||||
s.dispatch(l.e)
|
||||
}
|
||||
}
|
||||
|
||||
// dispatch schedules an edge to be processed
|
||||
func (s *scheduler) dispatch(e *edge) {
|
||||
inc := make([]pipe.Sender, len(s.incoming[e]))
|
||||
for i, p := range s.incoming[e] {
|
||||
inc[i] = p.Sender
|
||||
}
|
||||
out := make([]pipe.Receiver, len(s.outgoing[e]))
|
||||
for i, p := range s.outgoing[e] {
|
||||
out[i] = p.Receiver
|
||||
}
|
||||
|
||||
e.hasActiveOutgoing = false
|
||||
updates := []pipe.Receiver{}
|
||||
for _, p := range out {
|
||||
if ok := p.Receive(); ok {
|
||||
updates = append(updates, p)
|
||||
}
|
||||
if !p.Status().Completed {
|
||||
e.hasActiveOutgoing = true
|
||||
}
|
||||
}
|
||||
|
||||
pf := &pipeFactory{s: s, e: e}
|
||||
|
||||
// unpark the edge
|
||||
debugSchedulerPreUnpark(e, inc, updates, out)
|
||||
e.unpark(inc, updates, out, pf)
|
||||
debugSchedulerPostUnpark(e, inc)
|
||||
|
||||
postUnpark:
|
||||
// set up new requests that didn't complete/were added by this run
|
||||
openIncoming := make([]*edgePipe, 0, len(inc))
|
||||
for _, r := range s.incoming[e] {
|
||||
if !r.Sender.Status().Completed {
|
||||
openIncoming = append(openIncoming, r)
|
||||
}
|
||||
}
|
||||
if len(openIncoming) > 0 {
|
||||
s.incoming[e] = openIncoming
|
||||
} else {
|
||||
delete(s.incoming, e)
|
||||
}
|
||||
|
||||
openOutgoing := make([]*edgePipe, 0, len(out))
|
||||
for _, r := range s.outgoing[e] {
|
||||
if !r.Receiver.Status().Completed {
|
||||
openOutgoing = append(openOutgoing, r)
|
||||
}
|
||||
}
|
||||
if len(openOutgoing) > 0 {
|
||||
s.outgoing[e] = openOutgoing
|
||||
} else {
|
||||
delete(s.outgoing, e)
|
||||
}
|
||||
|
||||
// if keys changed there might be possiblity for merge with other edge
|
||||
if e.keysDidChange {
|
||||
if k := e.currentIndexKey(); k != nil {
|
||||
// skip this if not at least 1 key per dep
|
||||
origEdge := e.index.LoadOrStore(k, e)
|
||||
if origEdge != nil {
|
||||
logrus.Debugf("merging edge %s to %s\n", e.edge.Vertex.Name(), origEdge.edge.Vertex.Name())
|
||||
if s.mergeTo(origEdge, e) {
|
||||
s.ef.setEdge(e.edge, origEdge)
|
||||
}
|
||||
}
|
||||
}
|
||||
e.keysDidChange = false
|
||||
}
|
||||
|
||||
// validation to avoid deadlocks/resource leaks:
|
||||
// TODO: if these start showing up in error reports they can be changed
|
||||
// to error the edge instead. They can only appear from algorithm bugs in
|
||||
// unpark(), not for any external input.
|
||||
if len(openIncoming) > 0 && len(openOutgoing) == 0 {
|
||||
e.markFailed(pf, errors.New("buildkit scheduler error: return leaving incoming open. Please report this with BUILDKIT_SCHEDULER_DEBUG=1"))
|
||||
goto postUnpark
|
||||
}
|
||||
if len(openIncoming) == 0 && len(openOutgoing) > 0 {
|
||||
e.markFailed(pf, errors.New("buildkit scheduler error: return leaving outgoing open. Please report this with BUILDKIT_SCHEDULER_DEBUG=1"))
|
||||
goto postUnpark
|
||||
}
|
||||
}
|
||||
|
||||
// signal notifies that an edge needs to be processed again
|
||||
func (s *scheduler) signal(e *edge) {
|
||||
s.muQ.Lock()
|
||||
if _, ok := s.waitq[e]; !ok {
|
||||
d := &dispatcher{e: e}
|
||||
if s.last == nil {
|
||||
s.next = d
|
||||
} else {
|
||||
s.last.next = d
|
||||
}
|
||||
s.last = d
|
||||
s.waitq[e] = struct{}{}
|
||||
s.cond.Signal()
|
||||
}
|
||||
s.muQ.Unlock()
|
||||
}
|
||||
|
||||
// build evaluates edge into a result
|
||||
func (s *scheduler) build(ctx context.Context, edge Edge) (CachedResult, error) {
|
||||
s.mu.Lock()
|
||||
e := s.ef.getEdge(edge)
|
||||
if e == nil {
|
||||
s.mu.Unlock()
|
||||
return nil, errors.Errorf("invalid request %v for build", edge)
|
||||
}
|
||||
|
||||
wait := make(chan struct{})
|
||||
|
||||
var p *pipe.Pipe
|
||||
p = s.newPipe(e, nil, pipe.Request{Payload: &edgeRequest{desiredState: edgeStatusComplete}})
|
||||
p.OnSendCompletion = func() {
|
||||
p.Receiver.Receive()
|
||||
if p.Receiver.Status().Completed {
|
||||
close(wait)
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
p.Receiver.Cancel()
|
||||
}()
|
||||
|
||||
<-wait
|
||||
|
||||
if err := p.Receiver.Status().Err; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.Receiver.Status().Value.(*edgeState).result.Clone(), nil
|
||||
}
|
||||
|
||||
// newPipe creates a new request pipe between two edges
|
||||
func (s *scheduler) newPipe(target, from *edge, req pipe.Request) *pipe.Pipe {
|
||||
p := &edgePipe{
|
||||
Pipe: pipe.New(req),
|
||||
Target: target,
|
||||
From: from,
|
||||
}
|
||||
|
||||
s.signal(target)
|
||||
if from != nil {
|
||||
p.OnSendCompletion = func() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
s.signal(p.From)
|
||||
}
|
||||
s.outgoing[from] = append(s.outgoing[from], p)
|
||||
}
|
||||
s.incoming[target] = append(s.incoming[target], p)
|
||||
p.OnReceiveCompletion = func() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
s.signal(p.Target)
|
||||
}
|
||||
return p.Pipe
|
||||
}
|
||||
|
||||
// newRequestWithFunc creates a new request pipe that invokes a async function
|
||||
func (s *scheduler) newRequestWithFunc(e *edge, f func(context.Context) (interface{}, error)) pipe.Receiver {
|
||||
pp, start := pipe.NewWithFunction(f)
|
||||
p := &edgePipe{
|
||||
Pipe: pp,
|
||||
From: e,
|
||||
}
|
||||
p.OnSendCompletion = func() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
s.signal(p.From)
|
||||
}
|
||||
s.outgoing[e] = append(s.outgoing[e], p)
|
||||
go start()
|
||||
return p.Receiver
|
||||
}
|
||||
|
||||
// mergeTo merges the state from one edge to another. source edge is discarded.
|
||||
func (s *scheduler) mergeTo(target, src *edge) bool {
|
||||
if !target.edge.Vertex.Options().IgnoreCache && src.edge.Vertex.Options().IgnoreCache {
|
||||
return false
|
||||
}
|
||||
for _, inc := range s.incoming[src] {
|
||||
inc.mu.Lock()
|
||||
inc.Target = target
|
||||
s.incoming[target] = append(s.incoming[target], inc)
|
||||
inc.mu.Unlock()
|
||||
}
|
||||
|
||||
for _, out := range s.outgoing[src] {
|
||||
out.mu.Lock()
|
||||
out.From = target
|
||||
s.outgoing[target] = append(s.outgoing[target], out)
|
||||
out.mu.Unlock()
|
||||
out.Receiver.Cancel()
|
||||
}
|
||||
|
||||
delete(s.incoming, src)
|
||||
delete(s.outgoing, src)
|
||||
s.signal(target)
|
||||
|
||||
for i, d := range src.deps {
|
||||
for _, k := range d.keys {
|
||||
target.secondaryExporters = append(target.secondaryExporters, expDep{i, CacheKeyWithSelector{CacheKey: k, Selector: src.cacheMap.Deps[i].Selector}})
|
||||
}
|
||||
if d.slowCacheKey != nil {
|
||||
target.secondaryExporters = append(target.secondaryExporters, expDep{i, CacheKeyWithSelector{CacheKey: *d.slowCacheKey}})
|
||||
}
|
||||
if d.result != nil {
|
||||
for _, dk := range d.result.CacheKeys() {
|
||||
target.secondaryExporters = append(target.secondaryExporters, expDep{i, CacheKeyWithSelector{CacheKey: dk, Selector: src.cacheMap.Deps[i].Selector}})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(tonistiigi): merge cache providers
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// edgeFactory allows access to the edges from a shared graph
|
||||
type edgeFactory interface {
|
||||
getEdge(Edge) *edge
|
||||
setEdge(Edge, *edge)
|
||||
}
|
||||
|
||||
type pipeFactory struct {
|
||||
e *edge
|
||||
s *scheduler
|
||||
}
|
||||
|
||||
func (pf *pipeFactory) NewInputRequest(ee Edge, req *edgeRequest) pipe.Receiver {
|
||||
target := pf.s.ef.getEdge(ee)
|
||||
if target == nil {
|
||||
panic("failed to get edge") // TODO: return errored pipe
|
||||
}
|
||||
p := pf.s.newPipe(target, pf.e, pipe.Request{Payload: req})
|
||||
if debugScheduler {
|
||||
logrus.Debugf("> newPipe %s %p desiredState=%s", ee.Vertex.Name(), p, req.desiredState)
|
||||
}
|
||||
return p.Receiver
|
||||
}
|
||||
|
||||
func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, error)) pipe.Receiver {
|
||||
p := pf.s.newRequestWithFunc(pf.e, f)
|
||||
if debugScheduler {
|
||||
logrus.Debugf("> newFunc %p", p)
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pipe.Receiver) {
|
||||
if !debugScheduler {
|
||||
return
|
||||
}
|
||||
logrus.Debugf(">> unpark %s req=%d upt=%d out=%d state=%s %s", e.edge.Vertex.Name(), len(inc), len(updates), len(allPipes), e.state, e.edge.Vertex.Digest())
|
||||
|
||||
for i, dep := range e.deps {
|
||||
des := edgeStatusInitial
|
||||
if dep.req != nil {
|
||||
des = dep.req.Request().(*edgeRequest).desiredState
|
||||
}
|
||||
logrus.Debugf(":: dep%d %s state=%s des=%s keys=%d hasslowcache=%v", i, e.edge.Vertex.Inputs()[i].Vertex.Name(), dep.state, des, len(dep.keys), e.slowCacheFunc(dep) != nil)
|
||||
}
|
||||
|
||||
for i, in := range inc {
|
||||
req := in.Request()
|
||||
logrus.Debugf("> incoming-%d: %p dstate=%s canceled=%v", i, in, req.Payload.(*edgeRequest).desiredState, req.Canceled)
|
||||
}
|
||||
|
||||
for i, up := range updates {
|
||||
if up == e.cacheMapReq {
|
||||
logrus.Debugf("> update-%d: %p cacheMapReq complete=%v", i, up, up.Status().Completed)
|
||||
} else if up == e.execReq {
|
||||
logrus.Debugf("> update-%d: %p execReq complete=%v", i, up, up.Status().Completed)
|
||||
} else {
|
||||
st, ok := up.Status().Value.(*edgeState)
|
||||
if ok {
|
||||
index := -1
|
||||
if dep, ok := e.depRequests[up]; ok {
|
||||
index = int(dep.index)
|
||||
}
|
||||
logrus.Debugf("> update-%d: %p input-%d keys=%d state=%s", i, up, index, len(st.keys), st.state)
|
||||
} else {
|
||||
logrus.Debugf("> update-%d: unknown", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func debugSchedulerPostUnpark(e *edge, inc []pipe.Sender) {
|
||||
if !debugScheduler {
|
||||
return
|
||||
}
|
||||
for i, in := range inc {
|
||||
logrus.Debugf("< incoming-%d: %p completed=%v", i, in, in.Status().Completed)
|
||||
}
|
||||
logrus.Debugf("<< unpark %s\n", e.edge.Vertex.Name())
|
||||
}
|
Reference in New Issue
Block a user