mirror of
https://github.com/haiwen/seafile-server.git
synced 2025-08-02 07:43:09 +00:00
Merge same repo once and check fs object is valid (#578)
Co-authored-by: 杨赫然 <heran.yang@seafile.com>
This commit is contained in:
parent
550e75cb0b
commit
04350f2b99
@ -7,10 +7,12 @@ import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/haiwen/seafile-server/fileserver/objstore"
|
||||
"github.com/haiwen/seafile-server/fileserver/utils"
|
||||
)
|
||||
|
||||
// Commit is a commit object
|
||||
@ -57,7 +59,9 @@ func NewCommit(repoID, parentID, newRoot, user, desc string) *Commit {
|
||||
commit.CreatorID = "0000000000000000000000000000000000000000"
|
||||
commit.Ctime = time.Now().Unix()
|
||||
commit.CommitID = computeCommitID(commit)
|
||||
commit.ParentID.SetValid(parentID)
|
||||
if parentID != "" {
|
||||
commit.ParentID.SetValid(parentID)
|
||||
}
|
||||
|
||||
return commit
|
||||
}
|
||||
@ -85,6 +89,22 @@ func (commit *Commit) FromData(p []byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if !utils.IsValidUUID(commit.RepoID) {
|
||||
return fmt.Errorf("repo id %s is invalid", commit.RepoID)
|
||||
}
|
||||
if !utils.IsObjectIDValid(commit.RootID) {
|
||||
return fmt.Errorf("root id %s is invalid", commit.RootID)
|
||||
}
|
||||
if len(commit.CreatorID) != 40 {
|
||||
return fmt.Errorf("creator id %s is invalid", commit.CreatorID)
|
||||
}
|
||||
if commit.ParentID.Valid && !utils.IsObjectIDValid(commit.ParentID.String) {
|
||||
return fmt.Errorf("parent id %s is invalid", commit.ParentID.String)
|
||||
}
|
||||
if commit.SecondParentID.Valid && !utils.IsObjectIDValid(commit.SecondParentID.String) {
|
||||
return fmt.Errorf("second parent id %s is invalid", commit.SecondParentID.String)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
|
||||
const (
|
||||
commitID = "0401fc662e3bc87a41f299a907c056aaf8322a27"
|
||||
rootID = "6a1608dc2a1248838464e9b194800d35252e2ce3"
|
||||
repoID = "b1f2ad61-9164-418a-a47f-ab805dbd5694"
|
||||
seafileConfPath = "/tmp/conf"
|
||||
seafileDataDir = "/tmp/conf/seafile-data"
|
||||
@ -44,6 +45,7 @@ func TestCommit(t *testing.T) {
|
||||
newCommit := new(Commit)
|
||||
newCommit.CommitID = commitID
|
||||
newCommit.RepoID = repoID
|
||||
newCommit.RootID = rootID
|
||||
newCommit.CreatorName = "seafile"
|
||||
newCommit.CreatorID = commitID
|
||||
newCommit.Desc = "This is a commit"
|
||||
@ -57,11 +59,11 @@ func TestCommit(t *testing.T) {
|
||||
|
||||
commit, err := Load(repoID, commitID)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to load commit.\n")
|
||||
t.Errorf("Failed to load commit: %v.\n", err)
|
||||
}
|
||||
assertEqual(t, commit.CommitID, commitID)
|
||||
assertEqual(t, commit.RepoID, repoID)
|
||||
assertEqual(t, commit.CreatorName, "seafile")
|
||||
assertEqual(t, commit.CreatorID, commitID)
|
||||
assertEqual(t, commit.ParentID, commitID)
|
||||
assertEqual(t, commit.ParentID.String, commitID)
|
||||
}
|
||||
|
@ -255,8 +255,7 @@ func diffTestFileCB(ctx context.Context, baseDir string, files []*fsmgr.SeafDire
|
||||
}
|
||||
|
||||
if file1 != nil &&
|
||||
(file2 == nil || file1.ID != file2.ID) &&
|
||||
file1.ID != emptySHA1 {
|
||||
(file2 == nil || file1.ID != file2.ID) {
|
||||
*results = append(*results, file1.ID)
|
||||
}
|
||||
|
||||
@ -273,8 +272,7 @@ func diffTestDirCB(ctx context.Context, baseDir string, dirs []*fsmgr.SeafDirent
|
||||
}
|
||||
|
||||
if dir1 != nil &&
|
||||
(dir2 == nil || dir1.ID != dir2.ID) &&
|
||||
dir1.ID != emptySHA1 {
|
||||
(dir2 == nil || dir1.ID != dir2.ID) {
|
||||
*results = append(*results, dir1.ID)
|
||||
}
|
||||
|
||||
|
@ -2461,11 +2461,8 @@ func updateDir(repoID, dirPath, newDirID, user, headID string) (string, error) {
|
||||
err := fmt.Errorf("directory %s doesn't exist in repo %s", dirName, repo.StoreID)
|
||||
return "", err
|
||||
}
|
||||
newDent := new(fsmgr.SeafDirent)
|
||||
newDent.ID = newDirID
|
||||
newDent.Mode = (syscall.S_IFDIR | 0644)
|
||||
newDent.Mtime = time.Now().Unix()
|
||||
newDent.Name = dirName
|
||||
|
||||
newDent := fsmgr.NewDirent(newDirID, dirName, (syscall.S_IFDIR | 0644), time.Now().Unix(), "", 0)
|
||||
|
||||
rootID, err := doPutFile(repo, headCommit.RootID, canonPath, newDent)
|
||||
if err != nil || rootID == "" {
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"syscall"
|
||||
|
||||
"github.com/haiwen/seafile-server/fileserver/objstore"
|
||||
"github.com/haiwen/seafile-server/fileserver/utils"
|
||||
)
|
||||
|
||||
// Seafile is a file object
|
||||
@ -21,7 +22,7 @@ type Seafile struct {
|
||||
data []byte
|
||||
Version int `json:"version"`
|
||||
FileType int `json:"type"`
|
||||
FileID string `json:"file_id"`
|
||||
FileID string `json:"-"`
|
||||
FileSize uint64 `json:"size"`
|
||||
BlkIDs []string `json:"block_ids"`
|
||||
}
|
||||
@ -153,7 +154,7 @@ type SeafDir struct {
|
||||
data []byte
|
||||
Version int `json:"version"`
|
||||
DirType int `json:"type"`
|
||||
DirID string `json:"dir_id"`
|
||||
DirID string `json:"-"`
|
||||
Entries []*SeafDirent `json:"dirents"`
|
||||
}
|
||||
|
||||
@ -329,6 +330,21 @@ func (seafile *Seafile) FromData(p []byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if seafile.FileType != SeafMetadataTypeFile {
|
||||
return fmt.Errorf("object %s is not a file", seafile.FileID)
|
||||
}
|
||||
if seafile.Version < 1 {
|
||||
return fmt.Errorf("seafile object %s version should be > 0, version is %d", seafile.FileID, seafile.Version)
|
||||
}
|
||||
if seafile.BlkIDs == nil {
|
||||
return fmt.Errorf("no block id array in seafile object %s", seafile.FileID)
|
||||
}
|
||||
for _, blkID := range seafile.BlkIDs {
|
||||
if !utils.IsObjectIDValid(blkID) {
|
||||
return fmt.Errorf("block id %s is invalid", blkID)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -372,6 +388,20 @@ func (seafdir *SeafDir) FromData(p []byte) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if seafdir.DirType != SeafMetadataTypeDir {
|
||||
return fmt.Errorf("object %s is not a dir", seafdir.DirID)
|
||||
}
|
||||
if seafdir.Version < 1 {
|
||||
return fmt.Errorf("dir object %s version should be > 0, version is %d", seafdir.DirID, seafdir.Version)
|
||||
}
|
||||
if seafdir.Entries == nil {
|
||||
return fmt.Errorf("no dirents in dir object %s", seafdir.DirID)
|
||||
}
|
||||
for _, dent := range seafdir.Entries {
|
||||
if !utils.IsObjectIDValid(dent.ID) {
|
||||
return fmt.Errorf("dirent id %s is invalid", dent.ID)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -404,6 +434,8 @@ func GetSeafile(repoID string, fileID string) (*Seafile, error) {
|
||||
return seafile, nil
|
||||
}
|
||||
|
||||
seafile.FileID = fileID
|
||||
|
||||
err := ReadRaw(repoID, fileID, &buf)
|
||||
if err != nil {
|
||||
errors := fmt.Errorf("failed to read seafile object from storage : %v", err)
|
||||
@ -421,8 +453,6 @@ func GetSeafile(repoID string, fileID string) (*Seafile, error) {
|
||||
return nil, errors
|
||||
}
|
||||
|
||||
seafile.FileID = fileID
|
||||
|
||||
return seafile, nil
|
||||
}
|
||||
|
||||
@ -464,6 +494,8 @@ func GetSeafdir(repoID string, dirID string) (*SeafDir, error) {
|
||||
return seafdir, nil
|
||||
}
|
||||
|
||||
seafdir.DirID = dirID
|
||||
|
||||
err := ReadRaw(repoID, dirID, &buf)
|
||||
if err != nil {
|
||||
errors := fmt.Errorf("failed to read seafdir object from storage : %v", err)
|
||||
@ -481,8 +513,6 @@ func GetSeafdir(repoID string, dirID string) (*SeafDir, error) {
|
||||
return nil, errors
|
||||
}
|
||||
|
||||
seafdir.DirID = dirID
|
||||
|
||||
return seafdir, nil
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/haiwen/seafile-server/fileserver/blockmgr"
|
||||
"github.com/haiwen/seafile-server/fileserver/commitmgr"
|
||||
@ -24,6 +23,7 @@ import (
|
||||
"github.com/haiwen/seafile-server/fileserver/fsmgr"
|
||||
"github.com/haiwen/seafile-server/fileserver/repomgr"
|
||||
"github.com/haiwen/seafile-server/fileserver/share"
|
||||
"github.com/haiwen/seafile-server/fileserver/utils"
|
||||
"github.com/haiwen/seafile-server/fileserver/workerpool"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
@ -114,7 +114,7 @@ func getFsId(args ...interface{}) error {
|
||||
queries := r.URL.Query()
|
||||
|
||||
serverHead := queries.Get("server-head")
|
||||
if !isObjectIDValid(serverHead) {
|
||||
if !utils.IsObjectIDValid(serverHead) {
|
||||
msg := "Invalid server-head parameter."
|
||||
appErr := &appError{nil, msg, http.StatusBadRequest}
|
||||
resChan <- &calResult{"", appErr}
|
||||
@ -122,7 +122,7 @@ func getFsId(args ...interface{}) error {
|
||||
}
|
||||
|
||||
clientHead := queries.Get("client-head")
|
||||
if clientHead != "" && !isObjectIDValid(clientHead) {
|
||||
if clientHead != "" && !utils.IsObjectIDValid(clientHead) {
|
||||
msg := "Invalid client-head parameter."
|
||||
appErr := &appError{nil, msg, http.StatusBadRequest}
|
||||
resChan <- &calResult{"", appErr}
|
||||
@ -323,7 +323,7 @@ func getAccessibleRepoListCB(rsp http.ResponseWriter, r *http.Request) *appError
|
||||
queries := r.URL.Query()
|
||||
repoID := queries.Get("repo_id")
|
||||
|
||||
if repoID == "" || !isValidUUID(repoID) {
|
||||
if repoID == "" || !utils.IsValidUUID(repoID) {
|
||||
msg := "Invalid repo id."
|
||||
return &appError{nil, msg, http.StatusBadRequest}
|
||||
}
|
||||
@ -457,7 +457,7 @@ func recvFSCB(rsp http.ResponseWriter, r *http.Request) *appError {
|
||||
|
||||
for len(fsBuf) > 44 {
|
||||
objID := string(fsBuf[:40])
|
||||
if !isObjectIDValid(objID) {
|
||||
if !utils.IsObjectIDValid(objID) {
|
||||
msg := fmt.Sprintf("Fs obj id %s is invalid", objID)
|
||||
return &appError{nil, msg, http.StatusBadRequest}
|
||||
}
|
||||
@ -524,7 +524,7 @@ func postCheckExistCB(rsp http.ResponseWriter, r *http.Request, existType checkE
|
||||
var neededObjs []string
|
||||
var ret bool
|
||||
for i := 0; i < len(objIDList); i++ {
|
||||
if !isObjectIDValid(objIDList[i]) {
|
||||
if !utils.IsObjectIDValid(objIDList[i]) {
|
||||
continue
|
||||
}
|
||||
if existType == checkFSExist {
|
||||
@ -581,7 +581,7 @@ func packFSCB(rsp http.ResponseWriter, r *http.Request) *appError {
|
||||
var totalSize int
|
||||
var data bytes.Buffer
|
||||
for i := 0; i < len(fsIDList); i++ {
|
||||
if !isObjectIDValid(fsIDList[i]) {
|
||||
if !utils.IsObjectIDValid(fsIDList[i]) {
|
||||
msg := fmt.Sprintf("Invalid fs id %s", fsIDList[i])
|
||||
return &appError{nil, msg, http.StatusBadRequest}
|
||||
}
|
||||
@ -619,7 +619,7 @@ func headCommitsMultiCB(rsp http.ResponseWriter, r *http.Request) *appError {
|
||||
|
||||
var repoIDs strings.Builder
|
||||
for i := 0; i < len(repoIDList); i++ {
|
||||
if !isValidUUID(repoIDList[i]) {
|
||||
if !utils.IsValidUUID(repoIDList[i]) {
|
||||
return &appError{nil, "", http.StatusBadRequest}
|
||||
}
|
||||
if i == 0 {
|
||||
@ -704,11 +704,6 @@ func getCheckQuotaCB(rsp http.ResponseWriter, r *http.Request) *appError {
|
||||
return nil
|
||||
}
|
||||
|
||||
func isValidUUID(u string) bool {
|
||||
_, err := uuid.Parse(u)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func getFsObjIDCB(rsp http.ResponseWriter, r *http.Request) *appError {
|
||||
recvChan := make(chan *calResult)
|
||||
|
||||
@ -945,7 +940,7 @@ func getCommitInfo(rsp http.ResponseWriter, r *http.Request) *appError {
|
||||
func putUpdateBranchCB(rsp http.ResponseWriter, r *http.Request) *appError {
|
||||
queries := r.URL.Query()
|
||||
newCommitID := queries.Get("head")
|
||||
if newCommitID == "" || !isObjectIDValid(newCommitID) {
|
||||
if newCommitID == "" || !utils.IsObjectIDValid(newCommitID) {
|
||||
msg := fmt.Sprintf("commit id %s is invalid", newCommitID)
|
||||
return &appError{nil, msg, http.StatusBadRequest}
|
||||
}
|
||||
@ -1338,17 +1333,3 @@ func collectDirIDs(ctx context.Context, baseDir string, dirs []*fsmgr.SeafDirent
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func isObjectIDValid(objID string) bool {
|
||||
if len(objID) != 40 {
|
||||
return false
|
||||
}
|
||||
for i := 0; i < len(objID); i++ {
|
||||
c := objID[i]
|
||||
if (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') {
|
||||
continue
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
24
fileserver/utils/utils.go
Normal file
24
fileserver/utils/utils.go
Normal file
@ -0,0 +1,24 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func IsValidUUID(u string) bool {
|
||||
_, err := uuid.Parse(u)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func IsObjectIDValid(objID string) bool {
|
||||
if len(objID) != 40 {
|
||||
return false
|
||||
}
|
||||
for i := 0; i < len(objID); i++ {
|
||||
c := objID[i]
|
||||
if (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') {
|
||||
continue
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"math/rand"
|
||||
@ -21,6 +22,9 @@ const mergeVirtualRepoWorkerNumber = 5
|
||||
|
||||
var mergeVirtualRepoPool *workerpool.WorkPool
|
||||
|
||||
var runningRepo = make(map[string]struct{})
|
||||
var runningRepoMutex sync.Mutex
|
||||
|
||||
func virtualRepoInit() {
|
||||
mergeVirtualRepoPool = workerpool.CreateWorkerPool(mergeVirtualRepo, mergeVirtualRepoWorkerNumber)
|
||||
}
|
||||
@ -36,10 +40,23 @@ func mergeVirtualRepo(args ...interface{}) error {
|
||||
}
|
||||
|
||||
if virtual {
|
||||
runningRepoMutex.Lock()
|
||||
if _, ok := runningRepo[repoID]; ok {
|
||||
log.Debugf("a task for repo %s is already running", repoID)
|
||||
go mergeVirtualRepoPool.AddTask(repoID)
|
||||
runningRepoMutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
runningRepo[repoID] = struct{}{}
|
||||
runningRepoMutex.Unlock()
|
||||
|
||||
err := mergeRepo(repoID)
|
||||
if err != nil {
|
||||
log.Printf("%v", err)
|
||||
}
|
||||
runningRepoMutex.Lock()
|
||||
delete(runningRepo, repoID)
|
||||
runningRepoMutex.Unlock()
|
||||
|
||||
go updateSizePool.AddTask(repoID)
|
||||
|
||||
@ -55,11 +72,23 @@ func mergeVirtualRepo(args ...interface{}) error {
|
||||
if id == excludeRepo {
|
||||
continue
|
||||
}
|
||||
runningRepoMutex.Lock()
|
||||
if _, ok := runningRepo[id]; ok {
|
||||
log.Debugf("a task for repo %s is already running", id)
|
||||
go mergeVirtualRepoPool.AddTask(id)
|
||||
runningRepoMutex.Unlock()
|
||||
continue
|
||||
}
|
||||
runningRepo[id] = struct{}{}
|
||||
runningRepoMutex.Unlock()
|
||||
|
||||
err := mergeRepo(id)
|
||||
if err != nil {
|
||||
log.Printf("%v", err)
|
||||
}
|
||||
runningRepoMutex.Lock()
|
||||
delete(runningRepo, id)
|
||||
runningRepoMutex.Unlock()
|
||||
}
|
||||
|
||||
go updateSizePool.AddTask(repoID)
|
||||
|
Loading…
Reference in New Issue
Block a user