From 46114f12319a08fa68f1b5da465fb15afc002f6b Mon Sep 17 00:00:00 2001 From: Ben Schumacher Date: Fri, 10 Apr 2015 13:52:33 -0600 Subject: [PATCH] Rework BoltDB util functions to use bolt.Tx instead of DB along with updates to existing API that leverage these to use Bolt's transaction wrapping functions. --- datastore/bolt/build.go | 83 +++++++++++++++++++++-------------------- datastore/bolt/repo.go | 45 +++++++++++++++++----- datastore/bolt/task.go | 18 +++++++-- datastore/bolt/token.go | 15 ++++++-- datastore/bolt/user.go | 22 +++++++++-- datastore/bolt/util.go | 49 +++++++----------------- 6 files changed, 136 insertions(+), 96 deletions(-) diff --git a/datastore/bolt/build.go b/datastore/bolt/build.go index b39dcad24..a2bce5f93 100644 --- a/datastore/bolt/build.go +++ b/datastore/bolt/build.go @@ -15,7 +15,11 @@ import ( func (db *DB) GetBuild(repo string, build int) (*common.Build, error) { build_ := &common.Build{} key := []byte(repo + "/" + strconv.Itoa(build)) - err := get(db, bucketBuild, key, build_) + + err := db.View(func(t *bolt.Tx) error { + return get(t, bucketBuild, key, build_) + }) + return build_, err } @@ -41,7 +45,11 @@ func (db *DB) GetBuildLast(repo string) (*common.Build, error) { func (db *DB) GetBuildStatus(repo string, build int, status string) (*common.Status, error) { status_ := &common.Status{} key := []byte(repo + "/" + strconv.Itoa(build) + "/" + status) - err := update(db, bucketBuildStatus, key, status) + + err := db.Update(func(t *bolt.Tx) error { + return update(t, bucketBuildStatus, key, status) + }) + return status_, err } @@ -68,49 +76,36 @@ func (db *DB) GetBuildStatusList(repo string, build int) ([]*common.Status, erro // InsertBuild inserts a new build for the named repository func (db *DB) InsertBuild(repo string, build *common.Build) error { - var seqno int - - t, err := db.Begin(true) - if err != nil { - return err - } key := []byte(repo) - raw := t.Bucket(bucketBuildSeq).Get(key) - if raw != nil { - // convert our raw to an integer value - seqno = int(binary.LittleEndian.Uint32(raw)) - } - // increment the seqno, if no record was found, this starts us at 1 - seqno += 1 + return db.Update(func (t *bolt.Tx) error { + raw, err := raw(t, bucketBuildSeq, key) - // convert our new seqno back to raw value - raw = make([]byte, 4) // TODO: replace magic number 4 (uint32) - binary.LittleEndian.PutUint32(raw, uint32(seqno)) - err = t.Bucket(bucketBuildSeq).Put(key, raw) - if err != nil { - t.Rollback() - return err - } + var next_seq uint32 + switch err { + case ErrKeyNotFound: + next_seq = 1 + case nil: + next_seq = 1 + binary.LittleEndian.Uint32(raw) + default: + return err + } - // fill out build structure - build.Number = seqno - build.Created = time.Now().UTC().Unix() + // covert our seqno to raw value + raw = make([]byte, 4) // TODO(benschumacher) replace magic number 4 (uint32) + binary.LittleEndian.PutUint32(raw, next_seq) + err = t.Bucket(bucketBuildSeq).Put(key, raw) + if err != nil { + return err + } - key = []byte(repo + "/" + strconv.Itoa(build.Number)) - raw, err = encode(build) - if err != nil { - t.Rollback() - return err - } + // fill out the build structure + build.Number = int(next_seq) + build.Created = time.Now().UTC().Unix() - err = t.Bucket(bucketBuild).Put(key, raw) - if err != nil { - t.Rollback() - return err - } - - return t.Commit() + key = []byte(repo + "/" + strconv.Itoa(build.Number)) + return insert(t, bucketBuild, key, build) + }) } // InsertBuildStatus inserts a new build status for the @@ -118,7 +113,10 @@ func (db *DB) InsertBuild(repo string, build *common.Build) error { // exists an error is returned. func (db *DB) InsertBuildStatus(repo string, build int, status *common.Status) error { key := []byte(repo + "/" + strconv.Itoa(build) + "/" + status.Context) - return update(db, bucketBuildStatus, key, status) + + return db.Update(func(t *bolt.Tx) error { + return update(t, bucketBuildStatus, key, status) + }) } // UpdateBuild updates an existing build for the named @@ -127,5 +125,8 @@ func (db *DB) InsertBuildStatus(repo string, build int, status *common.Status) e func (db *DB) UpdateBuild(repo string, build *common.Build) error { key := []byte(repo + "/" + strconv.Itoa(build.Number)) build.Updated = time.Now().UTC().Unix() - return update(db, bucketBuild, key, build) + + return db.Update(func(t *bolt.Tx) error { + return update(t, bucketBuild, key, build) + }) } diff --git a/datastore/bolt/repo.go b/datastore/bolt/repo.go index 37bd21263..0381b51da 100644 --- a/datastore/bolt/repo.go +++ b/datastore/bolt/repo.go @@ -4,13 +4,18 @@ import ( "time" "github.com/drone/drone/common" + "github.com/boltdb/bolt" ) // GetRepo gets the repository by name. func (db *DB) GetRepo(repo string) (*common.Repo, error) { repo_ := &common.Repo{} key := []byte(repo) - err := get(db, bucketRepo, key, repo_) + + err := db.View(func (t *bolt.Tx) error { + return get(t, bucketRepo, key, repo_) + }) + return repo_, err } @@ -19,7 +24,11 @@ func (db *DB) GetRepo(repo string) (*common.Repo, error) { func (db *DB) GetRepoParams(repo string) (map[string]string, error) { params := map[string]string{} key := []byte(repo) - err := get(db, bucketRepoParams, key, ¶ms) + + err := db.View(func (t *bolt.Tx) error { + return get(t, bucketRepoParams, key, ¶ms) + }) + return params, err } @@ -28,7 +37,11 @@ func (db *DB) GetRepoParams(repo string) (map[string]string, error) { func (db *DB) GetRepoKeys(repo string) (*common.Keypair, error) { keypair := &common.Keypair{} key := []byte(repo) - err := get(db, bucketRepoKeys, key, keypair) + + err := db.View(func (t *bolt.Tx) error { + return get(t, bucketRepoKeys, key, keypair) + }) + return keypair, err } @@ -37,7 +50,10 @@ func (db *DB) GetRepoKeys(repo string) (*common.Keypair, error) { func (db *DB) UpdateRepo(repo *common.Repo) error { key := []byte(repo.FullName) repo.Updated = time.Now().UTC().Unix() - return update(db, bucketRepo, key, repo) + + return db.Update(func (t *bolt.Tx) error { + return update(t, bucketRepo, key, repo) + }) } // InsertRepo inserts a repository in the datastore and @@ -46,27 +62,38 @@ func (db *DB) InsertRepo(user *common.User, repo *common.Repo) error { key := []byte(repo.FullName) repo.Created = time.Now().UTC().Unix() repo.Updated = time.Now().UTC().Unix() - // TODO(bradrydzewski) add repo to user index - // TODO(bradrydzewski) add user to repo index - return insert(db, bucketRepo, key, repo) + + return db.Update(func (t *bolt.Tx) error { + // TODO(bradrydzewski) add repo to user index + // TODO(bradrydzewski) add user to repo index + return insert(t, bucketRepo, key, repo) + }) } // UpsertRepoParams inserts or updates the private // environment parameters for the named repository. func (db *DB) UpsertRepoParams(repo string, params map[string]string) error { key := []byte(repo) - return update(db, bucketRepoParams, key, params) + + return db.Update(func (t *bolt.Tx) error { + return update(t, bucketRepoParams, key, params) + }) } // UpsertRepoKeys inserts or updates the private and // public keypair for the named repository. func (db *DB) UpsertRepoKeys(repo string, keypair *common.Keypair) error { key := []byte(repo) - return update(db, bucketRepoKeys, key, keypair) + + return db.Update(func (t *bolt.Tx) error { + return update(t, bucketRepoKeys, key, keypair) + }) } // DeleteRepo deletes the repository. func (db *DB) DeleteRepo(repo *common.Repo) error { + //TODO(benschumacher) rework this to use BoltDB's txn wrapper + t, err := db.Begin(true) if err != nil { return err diff --git a/datastore/bolt/task.go b/datastore/bolt/task.go index 6d5b5e332..684d2f4e0 100644 --- a/datastore/bolt/task.go +++ b/datastore/bolt/task.go @@ -4,6 +4,7 @@ import ( "strconv" "github.com/drone/drone/common" + "github.com/boltdb/bolt" ) // GetTask gets the task at index N for the named @@ -11,7 +12,9 @@ import ( func (db *DB) GetTask(repo string, build int, task int) (*common.Task, error) { key := []byte(repo + "/" + strconv.Itoa(build) + "/" + strconv.Itoa(task)) task_ := &common.Task{} - err := get(db, bucketBuildTasks, key, task_) + err := db.View(func (t *bolt.Tx) error { + return get(t, bucketBuildTasks, key, task_) + }) return task_, err } @@ -19,7 +22,14 @@ func (db *DB) GetTask(repo string, build int, task int) (*common.Task, error) { // the named repository and build number. func (db *DB) GetTaskLogs(repo string, build int, task int) ([]byte, error) { key := []byte(repo + "/" + strconv.Itoa(build) + "/" + strconv.Itoa(task)) - log, err := raw(db, bucketBuildLogs, key) + + var log []byte + err := db.View(func (t *bolt.Tx) error { + var err error + log, err = raw(t, bucketBuildLogs, key) + return err + }) + return log, err } @@ -62,7 +72,9 @@ func (db *DB) GetTaskList(repo string, build int) ([]*common.Task, error) { // repository and build number. func (db *DB) UpsertTask(repo string, build int, task *common.Task) error { key := []byte(repo + "/" + strconv.Itoa(build) + "/" + strconv.Itoa(task.Number)) - return update(db, bucketBuildTasks, key, task) + return db.Update(func (t *bolt.Tx) error { + return update(t, bucketBuildTasks, key, task) + }) } // UpsertTaskLogs inserts or updates a task logs for the diff --git a/datastore/bolt/token.go b/datastore/bolt/token.go index 4e48054e1..48390c23f 100644 --- a/datastore/bolt/token.go +++ b/datastore/bolt/token.go @@ -2,13 +2,18 @@ package bolt import ( "github.com/drone/drone/common" + "github.com/boltdb/bolt" ) // GetToken gets a token by sha value. func (db *DB) GetToken(sha string) (*common.Token, error) { token := &common.Token{} key := []byte(sha) - err := get(db, bucketTokens, key, token) + + err := db.View(func (t *bolt.Tx) error { + return get(t, bucketTokens, key, token) + }) + return token, err } @@ -16,12 +21,16 @@ func (db *DB) GetToken(sha string) (*common.Token, error) { // If the token already exists and error is returned. func (db *DB) InsertToken(token *common.Token) error { key := []byte(token.Sha) - return insert(db, bucketTokens, key, token) + return db.Update(func (t *bolt.Tx) error { + return insert(t, bucketTokens, key, token) + }) // TODO(bradrydzewski) add token to users_token index } // DeleteUser deletes the token. func (db *DB) DeleteToken(token *common.Token) error { key := []byte(token.Sha) - return delete(db, bucketUser, key) + return db.Update(func (t *bolt.Tx) error { + return delete(t, bucketUser, key) + }) } diff --git a/datastore/bolt/user.go b/datastore/bolt/user.go index 6d7d62738..89c08b656 100644 --- a/datastore/bolt/user.go +++ b/datastore/bolt/user.go @@ -4,13 +4,18 @@ import ( "time" "github.com/drone/drone/common" + "github.com/boltdb/bolt" ) // GetUser gets a user by user login. func (db *DB) GetUser(login string) (*common.User, error) { user := &common.User{} key := []byte(login) - err := get(db, bucketUser, key, user) + + err := db.View(func (t *bolt.Tx) error { + return get(t, bucketUser, key, user) + }) + return user, err } @@ -120,7 +125,10 @@ func (db *DB) GetUserList() ([]*common.User, error) { func (db *DB) UpdateUser(user *common.User) error { key := []byte(user.Login) user.Updated = time.Now().UTC().Unix() - return update(db, bucketUser, key, user) + + return db.Update(func (t *bolt.Tx) error { + return update(t, bucketUser, key, user) + }) } // InsertUser inserts a new user into the datastore. If @@ -129,7 +137,10 @@ func (db *DB) InsertUser(user *common.User) error { key := []byte(user.Login) user.Created = time.Now().UTC().Unix() user.Updated = time.Now().UTC().Unix() - return insert(db, bucketUser, key, user) + + return db.Update(func (t *bolt.Tx) error { + return insert(t, bucketUser, key, user) + }) } // DeleteUser deletes the user. @@ -137,5 +148,8 @@ func (db *DB) DeleteUser(user *common.User) error { key := []byte(user.Login) // TODO(bradrydzewski) delete user subscriptions // TODO(bradrydzewski) delete user tokens - return delete(db, bucketUser, key) + + return db.Update(func (t *bolt.Tx) error { + return delete(t, bucketUser, key) + }) } diff --git a/datastore/bolt/util.go b/datastore/bolt/util.go index baa73e610..1b3c8ae36 100644 --- a/datastore/bolt/util.go +++ b/datastore/bolt/util.go @@ -1,6 +1,9 @@ package bolt -import "github.com/youtube/vitess/go/bson" +import ( + "github.com/youtube/vitess/go/bson" + "github.com/boltdb/bolt" +) func encode(v interface{}) ([]byte, error) { return bson.Marshal(v) @@ -10,12 +13,7 @@ func decode(raw []byte, v interface{}) error { return bson.Unmarshal(raw, v) } -func get(db *DB, bucket, key []byte, v interface{}) error { - t, err := db.Begin(false) - if err != nil { - return err - } - defer t.Rollback() +func get(t *bolt.Tx, bucket, key []byte, v interface{}) error { raw := t.Bucket(bucket).Get(key) if raw == nil { return ErrKeyNotFound @@ -23,12 +21,7 @@ func get(db *DB, bucket, key []byte, v interface{}) error { return bson.Unmarshal(raw, v) } -func raw(db *DB, bucket, key []byte) ([]byte, error) { - t, err := db.Begin(false) - if err != nil { - return nil, err - } - defer t.Rollback() +func raw(t *bolt.Tx, bucket, key []byte) ([]byte, error) { raw := t.Bucket(bucket).Get(key) if raw == nil { return nil, ErrKeyNotFound @@ -36,11 +29,7 @@ func raw(db *DB, bucket, key []byte) ([]byte, error) { return raw, nil } -func update(db *DB, bucket, key []byte, v interface{}) error { - t, err := db.Begin(true) - if err != nil { - return err - } +func update(t *bolt.Tx, bucket, key []byte, v interface{}) error { raw, err := encode(v) if err != nil { t.Rollback() @@ -48,17 +37,12 @@ func update(db *DB, bucket, key []byte, v interface{}) error { } err = t.Bucket(bucket).Put(key, raw) if err != nil { - t.Rollback() return err } - return t.Commit() + return nil } -func insert(db *DB, bucket, key []byte, v interface{}) error { - t, err := db.Begin(true) - if err != nil { - return err - } +func insert(t *bolt.Tx, bucket, key []byte, v interface{}) error { raw, err := encode(v) if err != nil { t.Rollback() @@ -67,26 +51,19 @@ func insert(db *DB, bucket, key []byte, v interface{}) error { // verify the key does not already exists // in the bucket. If exists, fail if t.Bucket(bucket).Get(key) != nil { - t.Rollback() return ErrKeyExists } err = t.Bucket(bucket).Put(key, raw) if err != nil { - t.Rollback() return err } - return t.Commit() + return nil } -func delete(db *DB, bucket, key []byte) error { - t, err := db.Begin(true) +func delete(t *bolt.Tx, bucket, key []byte) error { + err := t.Bucket(bucket).Delete(key) if err != nil { return err } - err = t.Bucket(bucket).Delete(key) - if err != nil { - t.Rollback() - return err - } - return t.Commit() + return nil }