Add tests and various fixes to parallel implementation

This commit is contained in:
Ettore Di Giacinto
2020-10-25 18:13:29 +01:00
committed by Ettore Di Giacinto
parent 9a7d92b02e
commit 9f1a182eee
2 changed files with 1317 additions and 24 deletions

View File

@@ -78,13 +78,14 @@ func (s *Parallel) noRulesInstalled() bool {
func (s *Parallel) buildParallelFormula(formulas []bf.Formula, packages pkg.Packages) (bf.Formula, error) { func (s *Parallel) buildParallelFormula(formulas []bf.Formula, packages pkg.Packages) (bf.Formula, error) {
var wg = new(sync.WaitGroup) var wg = new(sync.WaitGroup)
var wg2 = new(sync.WaitGroup)
all := make(chan pkg.Package) all := make(chan pkg.Package)
results := make(chan bf.Formula, 1) results := make(chan bf.Formula, 1)
for i := 0; i < s.Concurrency; i++ { for i := 0; i < s.Concurrency; i++ {
wg.Add(1) wg.Add(1)
go func(wg *sync.WaitGroup, c <-chan pkg.Package) { go func(wg *sync.WaitGroup, c <-chan pkg.Package) {
//defer wg.Done() defer wg.Done()
for p := range c { for p := range c {
solvable, err := p.BuildFormula(s.DefinitionDatabase, s.ParallelDatabase) solvable, err := p.BuildFormula(s.DefinitionDatabase, s.ParallelDatabase)
if err != nil { if err != nil {
@@ -96,11 +97,11 @@ func (s *Parallel) buildParallelFormula(formulas []bf.Formula, packages pkg.Pack
} }
}(wg, all) }(wg, all)
} }
wg2.Add(1)
go func() { go func() {
defer wg2.Done()
for t := range results { for t := range results {
formulas = append(formulas, t) formulas = append(formulas, t)
wg.Done()
} }
}() }()
@@ -110,7 +111,8 @@ func (s *Parallel) buildParallelFormula(formulas []bf.Formula, packages pkg.Pack
close(all) close(all)
wg.Wait() wg.Wait()
close(results)
wg2.Wait()
return bf.And(formulas...), nil return bf.And(formulas...), nil
} }
@@ -138,14 +140,15 @@ func (s *Parallel) BuildWorld(includeInstalled bool) (bf.Formula, error) {
func (s *Parallel) getList(db pkg.PackageDatabase, lsp pkg.Packages) (pkg.Packages, error) { func (s *Parallel) getList(db pkg.PackageDatabase, lsp pkg.Packages) (pkg.Packages, error) {
var ls pkg.Packages var ls pkg.Packages
var wg = new(sync.WaitGroup) var wg = new(sync.WaitGroup)
var wg2 = new(sync.WaitGroup)
all := make(chan pkg.Package) all := make(chan pkg.Package)
results := make(chan pkg.Package, 1) results := make(chan pkg.Package, 1)
for i := 0; i < s.Concurrency; i++ { for i := 0; i < s.Concurrency; i++ {
wg.Add(1) wg.Add(1)
go func(wg *sync.WaitGroup, c <-chan pkg.Package) { go func(wg *sync.WaitGroup, c <-chan pkg.Package) {
//defer wg.Done() defer wg.Done()
for p := range c { for p := range c {
cp, err := db.FindPackage(p) cp, err := db.FindPackage(p)
if err != nil { if err != nil {
@@ -157,19 +160,18 @@ func (s *Parallel) getList(db pkg.PackageDatabase, lsp pkg.Packages) (pkg.Packag
cp = packages.Best(nil) cp = packages.Best(nil)
} }
} }
ls = append(ls, cp)
results <- cp results <- cp
} }
}(wg, all) }(wg, all)
} }
go func() { wg2.Add(1)
go func(wg *sync.WaitGroup) {
defer wg2.Done()
for t := range results { for t := range results {
ls = append(ls, t) ls = append(ls, t)
wg.Done()
} }
}() }(wg)
for _, pp := range lsp { for _, pp := range lsp {
all <- pp all <- pp
@@ -177,6 +179,8 @@ func (s *Parallel) getList(db pkg.PackageDatabase, lsp pkg.Packages) (pkg.Packag
close(all) close(all)
wg.Wait() wg.Wait()
close(results)
wg2.Wait()
return ls, nil return ls, nil
} }
@@ -248,13 +252,14 @@ func (s *Parallel) ConflictsWith(pack pkg.Package, lsp pkg.Packages) (bool, erro
formulas = append(formulas, bf.And(bf.Not(P), r)) formulas = append(formulas, bf.And(bf.Not(P), r))
var wg = new(sync.WaitGroup) var wg = new(sync.WaitGroup)
var wg2 = new(sync.WaitGroup)
all := make(chan pkg.Package) all := make(chan pkg.Package)
results := make(chan bf.Formula, 1) results := make(chan bf.Formula, 1)
for i := 0; i < s.Concurrency; i++ { for i := 0; i < s.Concurrency; i++ {
wg.Add(1) wg.Add(1)
go func(wg *sync.WaitGroup, c <-chan pkg.Package) { go func(wg *sync.WaitGroup, c <-chan pkg.Package) {
//defer wg.Done() defer wg.Done()
for i := range c { for i := range c {
if i.Matches(p) { if i.Matches(p) {
continue continue
@@ -277,10 +282,11 @@ func (s *Parallel) ConflictsWith(pack pkg.Package, lsp pkg.Packages) (bool, erro
}(wg, all) }(wg, all)
} }
wg2.Add(1)
go func() { go func() {
defer wg2.Done()
for t := range results { for t := range results {
formulas = append(formulas, t) formulas = append(formulas, t)
wg.Done()
} }
}() }()
@@ -290,6 +296,8 @@ func (s *Parallel) ConflictsWith(pack pkg.Package, lsp pkg.Packages) (bool, erro
close(all) close(all)
wg.Wait() wg.Wait()
close(results)
wg2.Wait()
model := bf.Solve(bf.And(formulas...)) model := bf.Solve(bf.And(formulas...))
if model == nil { if model == nil {
@@ -380,16 +388,18 @@ func (s *Parallel) UpgradeUniverse(dropremoved bool) (pkg.Packages, PackagesAsse
} }
var wg = new(sync.WaitGroup) var wg = new(sync.WaitGroup)
var wg2 = new(sync.WaitGroup)
all := make(chan pkg.Package) all := make(chan pkg.Package)
results := make(chan []pkg.Package, 1) results := make(chan []pkg.Package, 1)
for i := 0; i < s.Concurrency; i++ { for i := 0; i < s.Concurrency; i++ {
wg.Add(1) wg.Add(1)
go func(wg *sync.WaitGroup, c <-chan pkg.Package) { go func(wg *sync.WaitGroup, c <-chan pkg.Package) {
//defer wg.Done() defer wg.Done()
for p := range c { for p := range c {
available, err := universe.FindPackageVersions(p) available, err := universe.FindPackageVersions(p)
if err != nil { if err != nil {
removed = append(removed, p) removed = append(removed, p) /// FIXME: Racy
} }
if len(available) == 0 { if len(available) == 0 {
continue continue
@@ -404,11 +414,12 @@ func (s *Parallel) UpgradeUniverse(dropremoved bool) (pkg.Packages, PackagesAsse
}(wg, all) }(wg, all)
} }
wg2.Add(1)
go func() { go func() {
defer wg2.Done()
for t := range results { for t := range results {
notUptodate = append(notUptodate, t[0]) notUptodate = append(notUptodate, t[0])
toUpgrade = append(toUpgrade, t[1]) toUpgrade = append(toUpgrade, t[1])
wg.Done()
} }
}() }()
@@ -419,6 +430,8 @@ func (s *Parallel) UpgradeUniverse(dropremoved bool) (pkg.Packages, PackagesAsse
close(all) close(all)
wg.Wait() wg.Wait()
close(results)
wg2.Wait()
// resolve to packages from the db to be able to encode correctly // resolve to packages from the db to be able to encode correctly
oldPackages, err := s.getList(universe, notUptodate) oldPackages, err := s.getList(universe, notUptodate)
@@ -485,6 +498,8 @@ func (s *Parallel) UpgradeUniverse(dropremoved bool) (pkg.Packages, PackagesAsse
return markedForRemoval, assertion, nil return markedForRemoval, assertion, nil
} }
// Upgrade compute upgrades of the package against the world definition.
// It accepts two boolean indicating if it has to check for conflicts or try to attempt a full upgrade
func (s *Parallel) Upgrade(checkconflicts, full bool) (pkg.Packages, PackagesAssertions, error) { func (s *Parallel) Upgrade(checkconflicts, full bool) (pkg.Packages, PackagesAssertions, error) {
// First get candidates that needs to be upgraded.. // First get candidates that needs to be upgraded..
@@ -501,13 +516,14 @@ func (s *Parallel) Upgrade(checkconflicts, full bool) (pkg.Packages, PackagesAss
installedcopy := pkg.NewInMemoryDatabase(false) installedcopy := pkg.NewInMemoryDatabase(false)
var wg = new(sync.WaitGroup) var wg = new(sync.WaitGroup)
var wg2 = new(sync.WaitGroup)
all := make(chan pkg.Package) all := make(chan pkg.Package)
results := make(chan []pkg.Package, 1) results := make(chan []pkg.Package, 1)
for i := 0; i < s.Concurrency; i++ { for i := 0; i < s.Concurrency; i++ {
wg.Add(1) wg.Add(1)
go func(wg *sync.WaitGroup, c <-chan pkg.Package) { go func(wg *sync.WaitGroup, c <-chan pkg.Package) {
//defer wg.Done() defer wg.Done()
for p := range c { for p := range c {
installedcopy.CreatePackage(p) installedcopy.CreatePackage(p)
packages, ok := availableCache[p.GetName()+p.GetCategory()] packages, ok := availableCache[p.GetName()+p.GetCategory()]
@@ -521,11 +537,12 @@ func (s *Parallel) Upgrade(checkconflicts, full bool) (pkg.Packages, PackagesAss
}(wg, all) }(wg, all)
} }
wg2.Add(1)
go func() { go func() {
defer wg2.Done()
for t := range results { for t := range results {
toUninstall = append(toUninstall, t[0]) toUninstall = append(toUninstall, t[0])
toInstall = append(toUninstall, t[1]) toInstall = append(toInstall, t[1])
wg.Done()
} }
}() }()
@@ -535,8 +552,10 @@ func (s *Parallel) Upgrade(checkconflicts, full bool) (pkg.Packages, PackagesAss
close(all) close(all)
wg.Wait() wg.Wait()
close(results)
wg2.Wait()
s2 := NewSolver(installedcopy, s.DefinitionDatabase, pkg.NewInMemoryDatabase(false)) s2 := &Parallel{Concurrency: s.Concurrency, InstalledDatabase: installedcopy, DefinitionDatabase: s.DefinitionDatabase, ParallelDatabase: pkg.NewInMemoryDatabase(false)}
s2.SetResolver(s.Resolver) s2.SetResolver(s.Resolver)
if !full { if !full {
ass := PackagesAssertions{} ass := PackagesAssertions{}
@@ -610,7 +629,7 @@ func (s *Parallel) Uninstall(c pkg.Package, checkconflicts, full bool) (pkg.Pack
} }
} }
s2 := NewSolver(pkg.NewInMemoryDatabase(false), s.DefinitionDatabase, pkg.NewInMemoryDatabase(false)) s2 := &Parallel{Concurrency: s.Concurrency, InstalledDatabase: pkg.NewInMemoryDatabase(false), DefinitionDatabase: s.DefinitionDatabase, ParallelDatabase: pkg.NewInMemoryDatabase(false)}
s2.SetResolver(s.Resolver) s2.SetResolver(s.Resolver)
// Get the requirements to install the candidate // Get the requirements to install the candidate
asserts, err := s2.Install(pkg.Packages{candidate}) asserts, err := s2.Install(pkg.Packages{candidate})
@@ -660,13 +679,14 @@ func (s *Parallel) BuildFormula() (bf.Formula, error) {
} }
var wg = new(sync.WaitGroup) var wg = new(sync.WaitGroup)
var wg2 = new(sync.WaitGroup)
all := make(chan pkg.Package) all := make(chan pkg.Package)
results := make(chan bf.Formula, 1) results := make(chan bf.Formula, 1)
for i := 0; i < s.Concurrency; i++ { for i := 0; i < s.Concurrency; i++ {
wg.Add(1) wg.Add(1)
go func(wg *sync.WaitGroup, c <-chan pkg.Package) { go func(wg *sync.WaitGroup, c <-chan pkg.Package) {
//defer wg.Done() defer wg.Done()
for wanted := range c { for wanted := range c {
encodedW, err := wanted.Encode(s.ParallelDatabase) encodedW, err := wanted.Encode(s.ParallelDatabase)
if err != nil { if err != nil {
@@ -691,11 +711,11 @@ func (s *Parallel) BuildFormula() (bf.Formula, error) {
} }
}(wg, all) }(wg, all)
} }
wg2.Add(1)
go func() { go func() {
defer wg2.Done()
for t := range results { for t := range results {
formulas = append(formulas, t) formulas = append(formulas, t)
wg.Done()
} }
}() }()
@@ -705,6 +725,8 @@ func (s *Parallel) BuildFormula() (bf.Formula, error) {
close(all) close(all)
wg.Wait() wg.Wait()
close(results)
wg2.Wait()
formulas = append(formulas, r) formulas = append(formulas, r)

1271
pkg/solver/parallel_test.go Normal file

File diff suppressed because it is too large Load Diff