Support concurrency in InMemoryDB and reach feature parity with boltdb

This commit is contained in:
Ettore Di Giacinto
2019-11-02 17:56:43 +01:00
parent c04e6496fb
commit 4fbe84b30b
3 changed files with 178 additions and 23 deletions

View File

@@ -19,14 +19,18 @@ import (
"errors" "errors"
"os" "os"
"strconv" "strconv"
"sync"
"time"
storm "github.com/asdine/storm" storm "github.com/asdine/storm"
"github.com/asdine/storm/q" "github.com/asdine/storm/q"
"go.etcd.io/bbolt"
) )
//var BoltInstance PackageDatabase //var BoltInstance PackageDatabase
type BoltDatabase struct { type BoltDatabase struct {
sync.Mutex
Path string Path string
} }
@@ -57,7 +61,7 @@ func (db *BoltDatabase) Retrieve(ID string) ([]byte, error) {
func (db *BoltDatabase) FindPackage(tofind Package) (Package, error) { func (db *BoltDatabase) FindPackage(tofind Package) (Package, error) {
p := &DefaultPackage{} p := &DefaultPackage{}
bolt, err := storm.Open(db.Path) bolt, err := storm.Open(db.Path, storm.BoltOptions(0600, &bbolt.Options{Timeout: 30 * time.Second}))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -72,7 +76,7 @@ func (db *BoltDatabase) FindPackage(tofind Package) (Package, error) {
func (db *BoltDatabase) UpdatePackage(p Package) error { func (db *BoltDatabase) UpdatePackage(p Package) error {
bolt, err := storm.Open(db.Path) bolt, err := storm.Open(db.Path, storm.BoltOptions(0600, &bbolt.Options{Timeout: 30 * time.Second}))
if err != nil { if err != nil {
return err return err
} }
@@ -87,12 +91,12 @@ func (db *BoltDatabase) UpdatePackage(p Package) error {
return err return err
} }
return err return nil
} }
func (db *BoltDatabase) GetPackage(ID string) (Package, error) { func (db *BoltDatabase) GetPackage(ID string) (Package, error) {
p := &DefaultPackage{} p := &DefaultPackage{}
bolt, err := storm.Open(db.Path) bolt, err := storm.Open(db.Path, storm.BoltOptions(0600, &bbolt.Options{Timeout: 30 * time.Second}))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -107,7 +111,7 @@ func (db *BoltDatabase) GetPackage(ID string) (Package, error) {
func (db *BoltDatabase) GetPackages() []string { func (db *BoltDatabase) GetPackages() []string {
ids := []string{} ids := []string{}
bolt, err := storm.Open(db.Path) bolt, err := storm.Open(db.Path, storm.BoltOptions(0600, &bbolt.Options{Timeout: 30 * time.Second}))
if err != nil { if err != nil {
return []string{} return []string{}
} }
@@ -124,25 +128,37 @@ func (db *BoltDatabase) GetPackages() []string {
} }
func (db *BoltDatabase) GetAllPackages(packages chan Package) error { func (db *BoltDatabase) GetAllPackages(packages chan Package) error {
bolt, err := storm.Open(db.Path) bolt, err := storm.Open(db.Path, storm.BoltOptions(0600, &bbolt.Options{Timeout: 30 * time.Second}))
if err != nil { if err != nil {
return err return err
} }
defer bolt.Close() defer bolt.Close()
// Fetching records one by one (useful when the bucket contains a lot of records) // Fetching records one by one (useful when the bucket contains a lot of records)
query := bolt.Select() //query := bolt.Select()
return query.Each(new(DefaultPackage), func(record interface{}) error { var packs []Package
u := record.(*DefaultPackage) err = bolt.All(&packs)
packages <- u if err != nil {
return err return err
}) }
for _, r := range packs {
packages <- r
}
return nil
// return query.Each(new(DefaultPackage), func(record interface{}) error {
// u := record.(*DefaultPackage)
// packages <- u
// return err
// })
} }
// Encode encodes the package to string. // Encode encodes the package to string.
// It returns an ID which can be used to retrieve the package later on. // It returns an ID which can be used to retrieve the package later on.
func (db *BoltDatabase) CreatePackage(p Package) (string, error) { func (db *BoltDatabase) CreatePackage(p Package) (string, error) {
bolt, err := storm.Open(db.Path) bolt, err := storm.Open(db.Path, storm.BoltOptions(0600, &bbolt.Options{Timeout: 30 * time.Second}))
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -161,5 +177,7 @@ func (db *BoltDatabase) CreatePackage(p Package) (string, error) {
} }
func (db *BoltDatabase) Clean() error { func (db *BoltDatabase) Clean() error {
db.Lock()
defer db.Unlock()
return os.RemoveAll(db.Path) return os.RemoveAll(db.Path)
} }

View File

@@ -21,24 +21,36 @@ import (
"errors" "errors"
"fmt" "fmt"
"hash/crc32" "hash/crc32"
"sync"
) )
var DBInMemoryInstance PackageDatabase var DBInMemoryInstance PackageDatabase
type InMemoryDatabase struct { type InMemoryDatabase struct {
*sync.Mutex
Database map[string]string Database map[string]string
} }
func NewInMemoryDatabase() PackageDatabase { func NewInMemoryDatabase(singleton bool) PackageDatabase {
// In memoryDB is a singleton // In memoryDB is a singleton
if DBInMemoryInstance == nil { if singleton && DBInMemoryInstance == nil {
DBInMemoryInstance = &InMemoryDatabase{Database: map[string]string{}} DBInMemoryInstance = &InMemoryDatabase{
Mutex: &sync.Mutex{},
Database: map[string]string{}}
}
if !singleton {
return &InMemoryDatabase{
Mutex: &sync.Mutex{},
Database: map[string]string{}}
} }
return DBInMemoryInstance return DBInMemoryInstance
} }
func (db *InMemoryDatabase) Get(s string) (string, error) { func (db *InMemoryDatabase) Get(s string) (string, error) {
db.Lock()
defer db.Unlock()
pa, ok := db.Database[s] pa, ok := db.Database[s]
if !ok { if !ok {
return "", errors.New("No key found with that id") return "", errors.New("No key found with that id")
@@ -47,6 +59,8 @@ func (db *InMemoryDatabase) Get(s string) (string, error) {
} }
func (db *InMemoryDatabase) Set(k, v string) error { func (db *InMemoryDatabase) Set(k, v string) error {
db.Lock()
defer db.Unlock()
db.Database[k] = v db.Database[k] = v
return nil return nil
@@ -55,7 +69,7 @@ func (db *InMemoryDatabase) Set(k, v string) error {
func (db *InMemoryDatabase) Create(v []byte) (string, error) { func (db *InMemoryDatabase) Create(v []byte) (string, error) {
enc := base64.StdEncoding.EncodeToString(v) enc := base64.StdEncoding.EncodeToString(v)
crc32q := crc32.MakeTable(0xD5828281) crc32q := crc32.MakeTable(0xD5828281)
ID := fmt.Sprintf("%08x", crc32.Checksum([]byte(enc), crc32q)) ID := fmt.Sprintf("%08x", crc32.Checksum([]byte(enc), crc32q)) // TODO: Replace with package fingerprint?
return ID, db.Set(ID, enc) return ID, db.Set(ID, enc)
} }
@@ -88,15 +102,21 @@ func (db *InMemoryDatabase) GetPackage(ID string) (Package, error) {
return p, nil return p, nil
} }
// Not implemented
func (db *InMemoryDatabase) GetAllPackages(packages chan Package) error { func (db *InMemoryDatabase) GetAllPackages(packages chan Package) error {
return errors.New("Not implemented") packs := db.GetPackages()
for _, p := range packs {
pack, err := db.GetPackage(p)
if err != nil {
return err
}
packages <- pack
}
return nil
} }
// Encode encodes the package to string. // Encode encodes the package to string.
// It returns an ID which can be used to retrieve the package later on. // It returns an ID which can be used to retrieve the package later on.
func (db *InMemoryDatabase) CreatePackage(p Package) (string, error) { func (db *InMemoryDatabase) CreatePackage(p Package) (string, error) {
pd, ok := p.(*DefaultPackage) pd, ok := p.(*DefaultPackage)
if !ok { if !ok {
return "", errors.New("InMemoryDatabase suports only DefaultPackage") return "", errors.New("InMemoryDatabase suports only DefaultPackage")
@@ -114,16 +134,73 @@ func (db *InMemoryDatabase) CreatePackage(p Package) (string, error) {
return ID, nil return ID, nil
} }
func (db *InMemoryDatabase) encodePackage(p Package) (string, string, error) {
pd, ok := p.(*DefaultPackage)
if !ok {
return "", "", errors.New("InMemoryDatabase suports only DefaultPackage")
}
res, err := json.Marshal(pd)
if err != nil {
return "", "", err
}
enc := base64.StdEncoding.EncodeToString(res)
crc32q := crc32.MakeTable(0xD5828281)
ID := fmt.Sprintf("%08x", crc32.Checksum([]byte(enc), crc32q)) // TODO: Replace with package fingerprint?
return ID, enc, nil
}
func (db *InMemoryDatabase) FindPackage(p Package) (Package, error) { func (db *InMemoryDatabase) FindPackage(p Package) (Package, error) {
return nil, errors.New("Not implemented")
// TODO: Replace this piece, when IDs are fingerprint, findpackage becames O(1)
for _, k := range db.GetPackages() {
pack, err := db.GetPackage(k)
if err != nil {
return nil, err
}
if pack.GetFingerPrint() == p.GetFingerPrint() {
return pack, nil
}
}
return nil, errors.New("Package not found")
} }
func (db *InMemoryDatabase) UpdatePackage(p Package) error { func (db *InMemoryDatabase) UpdatePackage(p Package) error {
return errors.New("Not implemented") var id string
found := false
for _, k := range db.GetPackages() {
pack, err := db.GetPackage(k)
if err != nil {
return err
}
if pack.GetFingerPrint() == p.GetFingerPrint() {
id = k
found = true
break
}
}
if found {
_, enc, err := db.encodePackage(p)
if err != nil {
return err
}
return db.Set(id, enc)
}
return errors.New("Package not found")
} }
func (db *InMemoryDatabase) GetPackages() []string { func (db *InMemoryDatabase) GetPackages() []string {
return []string{} keys := []string{}
db.Lock()
defer db.Unlock()
for k, _ := range db.Database {
keys = append(keys, k)
}
return keys
} }
func (db *InMemoryDatabase) Clean() error { func (db *InMemoryDatabase) Clean() error {

View File

@@ -0,0 +1,60 @@
// Copyright © 2019 Ettore Di Giacinto <mudler@gentoo.org>
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along
// with this program; if not, see <http://www.gnu.org/licenses/>.
package pkg_test
import (
. "github.com/mudler/luet/pkg/package"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Database", func() {
db := NewInMemoryDatabase(false)
Context("Simple package", func() {
a := NewPackage("A", ">=1.0", []*DefaultPackage{}, []*DefaultPackage{})
// a1 := NewPackage("A", "1.0", []*DefaultPackage{}, []*DefaultPackage{})
// a11 := NewPackage("A", "1.1", []*DefaultPackage{}, []*DefaultPackage{})
// a01 := NewPackage("A", "0.1", []*DefaultPackage{}, []*DefaultPackage{})
It("Saves and get data back correctly", func() {
ID, err := db.CreatePackage(a)
Expect(err).ToNot(HaveOccurred())
pack, err := db.GetPackage(ID)
Expect(err).ToNot(HaveOccurred())
Expect(pack).To(Equal(a))
})
It("Gets all", func() {
ids := db.GetPackages()
Expect(ids).To(Equal([]string{"9164d667"}))
})
It("Find packages", func() {
pack, err := db.FindPackage(a)
Expect(err).ToNot(HaveOccurred())
Expect(pack).To(Equal(a))
})
})
})