Make the parallel solver completely parallel in building formulas from dataset

This commit is contained in:
Ettore Di Giacinto
2020-10-25 17:20:19 +01:00
committed by Ettore Di Giacinto
parent c5ed36b2bd
commit 9a7d92b02e

View File

@@ -100,7 +100,7 @@ func (s *Parallel) buildParallelFormula(formulas []bf.Formula, packages pkg.Pack
go func() { go func() {
for t := range results { for t := range results {
formulas = append(formulas, t) formulas = append(formulas, t)
wg.Done() // ** move the `Done()` call here wg.Done()
} }
}() }()
@@ -139,19 +139,45 @@ func (s *Parallel) BuildWorld(includeInstalled bool) (bf.Formula, error) {
func (s *Parallel) getList(db pkg.PackageDatabase, lsp pkg.Packages) (pkg.Packages, error) { func (s *Parallel) getList(db pkg.PackageDatabase, lsp pkg.Packages) (pkg.Packages, error) {
var ls pkg.Packages var ls pkg.Packages
for _, pp := range lsp { var wg = new(sync.WaitGroup)
cp, err := db.FindPackage(pp) all := make(chan pkg.Package)
if err != nil { results := make(chan pkg.Package, 1)
packages, err := pp.Expand(db) for i := 0; i < s.Concurrency; i++ {
// Expand, and relax search - if not found pick the same one wg.Add(1)
if err != nil || len(packages) == 0 { go func(wg *sync.WaitGroup, c <-chan pkg.Package) {
cp = pp //defer wg.Done()
} else { for p := range c {
cp = packages.Best(nil) cp, err := db.FindPackage(p)
if err != nil {
packages, err := p.Expand(db)
// Expand, and relax search - if not found pick the same one
if err != nil || len(packages) == 0 {
cp = p
} else {
cp = packages.Best(nil)
}
}
ls = append(ls, cp)
results <- cp
} }
} }(wg, all)
ls = append(ls, cp)
} }
go func() {
for t := range results {
ls = append(ls, t)
wg.Done()
}
}()
for _, pp := range lsp {
all <- pp
}
close(all)
wg.Wait()
return ls, nil return ls, nil
} }
@@ -196,8 +222,6 @@ func (s *Parallel) ConflictsWith(pack pkg.Package, lsp pkg.Packages) (bool, erro
p, err := s.DefinitionDatabase.FindPackage(pack) p, err := s.DefinitionDatabase.FindPackage(pack)
if err != nil { if err != nil {
p = pack //Relax search, otherwise we cannot compute solutions for packages not in definitions p = pack //Relax search, otherwise we cannot compute solutions for packages not in definitions
// return false, errors.Wrap(err, "Package not found in definition db")
} }
ls, err := s.getList(s.DefinitionDatabase, lsp) ls, err := s.getList(s.DefinitionDatabase, lsp)
@@ -223,23 +247,50 @@ func (s *Parallel) ConflictsWith(pack pkg.Package, lsp pkg.Packages) (bool, erro
} }
formulas = append(formulas, bf.And(bf.Not(P), r)) formulas = append(formulas, bf.And(bf.Not(P), r))
for _, i := range ls { var wg = new(sync.WaitGroup)
if i.Matches(p) {
continue
}
// XXX: Skip check on any of its requires ? ( Drop to avoid removing system packages when selecting an uninstall)
// if i.RequiresContains(p) {
// fmt.Println("Requires found")
// continue
// }
encodedI, err := i.Encode(s.ParallelDatabase) all := make(chan pkg.Package)
if err != nil { results := make(chan bf.Formula, 1)
return false, err for i := 0; i < s.Concurrency; i++ {
} wg.Add(1)
I := bf.Var(encodedI) go func(wg *sync.WaitGroup, c <-chan pkg.Package) {
formulas = append(formulas, bf.And(I, r)) //defer wg.Done()
for i := range c {
if i.Matches(p) {
continue
}
// XXX: Skip check on any of its requires ? ( Drop to avoid removing system packages when selecting an uninstall)
// if i.RequiresContains(p) {
// fmt.Println("Requires found")
// continue
// }
encodedI, err := i.Encode(s.ParallelDatabase)
if err != nil {
panic(err)
}
I := bf.Var(encodedI)
results <- bf.And(I, r)
}
}(wg, all)
} }
go func() {
for t := range results {
formulas = append(formulas, t)
wg.Done()
}
}()
for _, p := range ls {
all <- p
}
close(all)
wg.Wait()
model := bf.Solve(bf.And(formulas...)) model := bf.Solve(bf.And(formulas...))
if model == nil { if model == nil {
return true, nil return true, nil
@@ -328,24 +379,47 @@ func (s *Parallel) UpgradeUniverse(dropremoved bool) (pkg.Packages, PackagesAsse
universe.CreatePackage(p) universe.CreatePackage(p)
} }
var wg = new(sync.WaitGroup)
all := make(chan pkg.Package)
results := make(chan []pkg.Package, 1)
for i := 0; i < s.Concurrency; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup, c <-chan pkg.Package) {
//defer wg.Done()
for p := range c {
available, err := universe.FindPackageVersions(p)
if err != nil {
removed = append(removed, p)
}
if len(available) == 0 {
continue
}
bestmatch := available.Best(nil)
// Found a better version available
if !bestmatch.Matches(p) {
results <- []pkg.Package{p, bestmatch}
}
}
}(wg, all)
}
go func() {
for t := range results {
notUptodate = append(notUptodate, t[0])
toUpgrade = append(toUpgrade, t[1])
wg.Done()
}
}()
// Grab all the installed ones, see if they are eligible for update // Grab all the installed ones, see if they are eligible for update
for _, p := range s.Installed() { for _, p := range s.Installed() {
available, err := universe.FindPackageVersions(p) all <- p
if err != nil {
removed = append(removed, p)
}
if len(available) == 0 {
continue
}
bestmatch := available.Best(nil)
// Found a better version available
if !bestmatch.Matches(p) {
notUptodate = append(notUptodate, p)
toUpgrade = append(toUpgrade, bestmatch)
}
} }
close(all)
wg.Wait()
// resolve to packages from the db to be able to encode correctly // resolve to packages from the db to be able to encode correctly
oldPackages, err := s.getList(universe, notUptodate) oldPackages, err := s.getList(universe, notUptodate)
if err != nil { if err != nil {
@@ -584,29 +658,54 @@ func (s *Parallel) BuildFormula() (bf.Formula, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, wanted := range s.Wanted {
encodedW, err := wanted.Encode(s.ParallelDatabase)
if err != nil {
return nil, err
}
W := bf.Var(encodedW)
installedWorld := s.Installed()
//TODO:Optimize
if len(installedWorld) == 0 {
formulas = append(formulas, W) //bf.And(bf.True, W))
continue
}
for _, installed := range installedWorld { var wg = new(sync.WaitGroup)
encodedI, err := installed.Encode(s.ParallelDatabase)
if err != nil { all := make(chan pkg.Package)
return nil, err results := make(chan bf.Formula, 1)
for i := 0; i < s.Concurrency; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup, c <-chan pkg.Package) {
//defer wg.Done()
for wanted := range c {
encodedW, err := wanted.Encode(s.ParallelDatabase)
if err != nil {
panic(err)
}
W := bf.Var(encodedW)
installedWorld := s.Installed()
//TODO:Optimize
if len(installedWorld) == 0 {
results <- W
continue
}
for _, installed := range installedWorld {
encodedI, err := installed.Encode(s.ParallelDatabase)
if err != nil {
panic(err)
}
I := bf.Var(encodedI)
results <- bf.And(W, I)
}
} }
I := bf.Var(encodedI) }(wg, all)
formulas = append(formulas, bf.And(W, I))
}
} }
go func() {
for t := range results {
formulas = append(formulas, t)
wg.Done()
}
}()
for _, wanted := range s.Wanted {
all <- wanted
}
close(all)
wg.Wait()
formulas = append(formulas, r) formulas = append(formulas, r)
return bf.And(formulas...), nil return bf.And(formulas...), nil