mirror of
https://github.com/mudler/luet.git
synced 2025-09-03 08:14:46 +00:00
Make Compile work in parallel
Also make the spinner thread-safe
This commit is contained in:
@@ -16,14 +16,15 @@
|
||||
package compiler
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/mudler/luet/pkg/helpers"
|
||||
pkg "github.com/mudler/luet/pkg/package"
|
||||
"github.com/mudler/luet/pkg/tree"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const BuildFile = "build.yaml"
|
||||
@@ -43,6 +44,49 @@ func NewLuetCompiler(backend CompilerBackend, t pkg.Tree) Compiler {
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *LuetCompiler) compilerWorker(i int, wg *sync.WaitGroup, cspecs chan CompilationSpec, a *[]Artifact, m *sync.Mutex, concurrency int, keepPermissions bool, errors chan error) {
|
||||
defer wg.Done()
|
||||
|
||||
for s := range cspecs {
|
||||
ar, err := cs.Compile(concurrency, keepPermissions, s)
|
||||
if err != nil {
|
||||
errors <- err
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
*a = append(*a, ar)
|
||||
m.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *LuetCompiler) CompileParallel(concurrency int, keepPermissions bool, ps []CompilationSpec) ([]Artifact, []error) {
|
||||
all := make(chan CompilationSpec)
|
||||
artifacts := []Artifact{}
|
||||
mutex := &sync.Mutex{}
|
||||
errors := make(chan error, len(ps))
|
||||
var wg = new(sync.WaitGroup)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
wg.Add(1)
|
||||
go cs.compilerWorker(i, wg, all, &artifacts, mutex, concurrency, keepPermissions, errors)
|
||||
}
|
||||
|
||||
for _, p := range ps {
|
||||
all <- p
|
||||
}
|
||||
|
||||
close(all)
|
||||
wg.Wait()
|
||||
close(errors)
|
||||
|
||||
var allErrors []error
|
||||
|
||||
for e := range errors {
|
||||
allErrors = append(allErrors, e)
|
||||
}
|
||||
|
||||
return artifacts, allErrors
|
||||
}
|
||||
|
||||
func (cs *LuetCompiler) Compile(concurrency int, keepPermissions bool, p CompilationSpec) (Artifact, error) {
|
||||
|
||||
// - If image is not set, we read a base_image. Then we will build one image from it to kick-off our build based
|
||||
@@ -62,7 +106,7 @@ func (cs *LuetCompiler) Compile(concurrency int, keepPermissions bool, p Compila
|
||||
// First we copy the source definitions into the output - we create a copy which the builds will need (we need to cache this phase somehow)
|
||||
err := helpers.CopyDir(p.GetPackage().GetPath(), buildDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "Could not copy package sources")
|
||||
|
||||
}
|
||||
|
||||
@@ -76,12 +120,12 @@ func (cs *LuetCompiler) Compile(concurrency int, keepPermissions bool, p Compila
|
||||
}
|
||||
err = cs.Backend.BuildImage(builderOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "Could not build image")
|
||||
}
|
||||
|
||||
err = cs.Backend.ExportImage(builderOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "Could not export image")
|
||||
}
|
||||
|
||||
// Then we write the step image, which uses the builder one
|
||||
@@ -94,18 +138,18 @@ func (cs *LuetCompiler) Compile(concurrency int, keepPermissions bool, p Compila
|
||||
}
|
||||
err = cs.Backend.ImageDefinitionToTar(runnerOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "Could not export image to tar")
|
||||
}
|
||||
|
||||
diffs, err := cs.Backend.Changes(p.Rel(p.GetPackage().GetFingerPrint()+"-builder.image.tar"), p.Rel(p.GetPackage().GetFingerPrint()+".image.tar"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "Could not generate changes from layers")
|
||||
}
|
||||
|
||||
// TODO: Handle caching and optionally do not remove things
|
||||
err = cs.Backend.RemoveImage(builderOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "Could not remove image")
|
||||
}
|
||||
|
||||
rootfs, err := ioutil.TempDir(p.GetOutputPath(), "rootfs")
|
||||
@@ -114,18 +158,18 @@ func (cs *LuetCompiler) Compile(concurrency int, keepPermissions bool, p Compila
|
||||
// TODO: Compression and such
|
||||
err = cs.Backend.ExtractRootfs(CompilerBackendOptions{SourcePath: runnerOpts.Destination, Destination: rootfs}, keepPermissions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "Could not extract rootfs")
|
||||
}
|
||||
artifact, err := ExtractArtifactFromDelta(rootfs, p.Rel(p.GetPackage().GetFingerPrint()+".package.tar"), diffs, concurrency, keepPermissions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "Could not generate deltas")
|
||||
}
|
||||
|
||||
return artifact, nil
|
||||
}
|
||||
|
||||
// TODO: Solve - hash
|
||||
return nil, errors.New("Not implemented yet")
|
||||
return nil, errors.New("Image build with a seed image is not implemented yet")
|
||||
}
|
||||
|
||||
func (cs *LuetCompiler) FromPackage(p pkg.Package) (CompilationSpec, error) {
|
||||
|
@@ -70,4 +70,38 @@ var _ = Describe("Compiler", func() {
|
||||
|
||||
})
|
||||
})
|
||||
|
||||
Context("Simple package build definition", func() {
|
||||
It("Compiles it in parallel", func() {
|
||||
generalRecipe := tree.NewCompilerRecipe()
|
||||
|
||||
err := generalRecipe.Load("../../tests/fixtures/buildable")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(generalRecipe.Tree()).ToNot(BeNil()) // It should be populated back at this point
|
||||
|
||||
Expect(len(generalRecipe.Tree().GetPackageSet().GetPackages())).To(Equal(3))
|
||||
|
||||
compiler := NewLuetCompiler(sd.NewSimpleDockerBackend(), generalRecipe.Tree())
|
||||
spec, err := compiler.FromPackage(&pkg.DefaultPackage{Name: "b", Category: "test", Version: "1.0"})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
spec2, err := compiler.FromPackage(&pkg.DefaultPackage{Name: "a", Category: "test", Version: "1.0"})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
Expect(spec.GetPackage().GetPath()).ToNot(Equal(""))
|
||||
|
||||
tmpdir, err := ioutil.TempDir("", "tree")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
defer os.RemoveAll(tmpdir) // clean up
|
||||
|
||||
spec.SetOutputPath(tmpdir)
|
||||
spec2.SetOutputPath(tmpdir)
|
||||
artifacts, errs := compiler.CompileParallel(2, false, []CompilationSpec{spec, spec2})
|
||||
Expect(len(errs)).To(Equal(0))
|
||||
for _, artifact := range artifacts {
|
||||
Expect(helpers.Exists(artifact.GetPath())).To(BeTrue())
|
||||
Expect(helpers.Untar(artifact.GetPath(), tmpdir, false)).ToNot(HaveOccurred())
|
||||
}
|
||||
|
||||
})
|
||||
})
|
||||
})
|
||||
|
@@ -21,6 +21,7 @@ import (
|
||||
|
||||
type Compiler interface {
|
||||
Compile(int, bool, CompilationSpec) (Artifact, error)
|
||||
CompileParallel(concurrency int, keepPermissions bool, ps []CompilationSpec) ([]Artifact, []error)
|
||||
FromPackage(pkg.Package) (CompilationSpec, error)
|
||||
|
||||
SetBackend(CompilerBackend)
|
||||
|
@@ -113,7 +113,7 @@ func CopyDir(src string, dst string) (err error) {
|
||||
return
|
||||
}
|
||||
if err == nil {
|
||||
return fmt.Errorf("destination already exists")
|
||||
// return fmt.Errorf("destination already exists")
|
||||
}
|
||||
|
||||
err = os.MkdirAll(dst, si.Mode())
|
||||
|
@@ -13,13 +13,19 @@ import (
|
||||
|
||||
var s *spinner.Spinner
|
||||
var m = &sync.Mutex{}
|
||||
var enabled = false
|
||||
|
||||
func Spinner(i int) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
if i > 43 {
|
||||
i = 43
|
||||
}
|
||||
|
||||
if s == nil {
|
||||
s = spinner.New(spinner.CharSets[i], 100*time.Millisecond) // Build our new spinner
|
||||
}
|
||||
enabled = true
|
||||
s.Start() // Start the spinner
|
||||
}
|
||||
|
||||
@@ -31,8 +37,10 @@ func SpinnerText(suffix, prefix string) {
|
||||
}
|
||||
|
||||
func SpinnerStop() {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
s.Stop()
|
||||
s = nil
|
||||
enabled = false
|
||||
}
|
||||
|
||||
func msg(level string, msg ...interface{}) {
|
||||
@@ -48,7 +56,7 @@ func msg(level string, msg ...interface{}) {
|
||||
levelMsg = Bold(Red("Error")).BgBlack().String()
|
||||
}
|
||||
|
||||
if s != nil {
|
||||
if enabled {
|
||||
SpinnerText(Sprintf(msg), levelMsg)
|
||||
return
|
||||
}
|
||||
@@ -57,7 +65,8 @@ func msg(level string, msg ...interface{}) {
|
||||
for _, f := range msg {
|
||||
cmd = append(cmd, f)
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
fmt.Println(cmd...)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user