1
0
mirror of https://github.com/haiwen/seafile-server.git synced 2025-04-28 11:24:48 +00:00
seafile-server/fileserver/sync_api.go
2023-10-16 10:06:40 +08:00

1427 lines
36 KiB
Go

package main
import (
"bytes"
"context"
"database/sql"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"html"
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
jwt "github.com/dgrijalva/jwt-go"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/haiwen/seafile-server/fileserver/blockmgr"
"github.com/haiwen/seafile-server/fileserver/commitmgr"
"github.com/haiwen/seafile-server/fileserver/diff"
"github.com/haiwen/seafile-server/fileserver/fsmgr"
"github.com/haiwen/seafile-server/fileserver/option"
"github.com/haiwen/seafile-server/fileserver/repomgr"
"github.com/haiwen/seafile-server/fileserver/share"
"github.com/haiwen/seafile-server/fileserver/utils"
"github.com/haiwen/seafile-server/fileserver/workerpool"
log "github.com/sirupsen/logrus"
)
type checkExistType int32
const (
checkFSExist checkExistType = 0
checkBlockExist checkExistType = 1
)
const (
seafileServerChannelEvent = "seaf_server.event"
seafileServerChannelStats = "seaf_server.stats"
emptySHA1 = "0000000000000000000000000000000000000000"
tokenExpireTime = 7200
permExpireTime = 7200
virtualRepoExpireTime = 7200
syncAPICleaningIntervalSec = 300
maxObjectPackSize = 1 << 20 // 1MB
fsIdWorkers = 10
)
var (
tokenCache sync.Map
permCache sync.Map
virtualRepoInfoCache sync.Map
calFsIdPool *workerpool.WorkPool
)
type tokenInfo struct {
repoID string
email string
expireTime int64
}
type permInfo struct {
perm string
expireTime int64
}
type virtualRepoInfo struct {
storeID string
expireTime int64
}
type repoEventData struct {
eType string
user string
ip string
repoID string
path string
clientName string
}
type statusEventData struct {
eType string
user string
repoID string
bytes uint64
}
func syncAPIInit() {
ticker := time.NewTicker(time.Second * syncAPICleaningIntervalSec)
go RecoverWrapper(func() {
for range ticker.C {
removeSyncAPIExpireCache()
}
})
calFsIdPool = workerpool.CreateWorkerPool(getFsId, fsIdWorkers)
}
type calResult struct {
user string
err *appError
}
func getFsId(args ...interface{}) error {
if len(args) < 3 {
return nil
}
resChan := args[0].(chan *calResult)
rsp := args[1].(http.ResponseWriter)
r := args[2].(*http.Request)
queries := r.URL.Query()
serverHead := queries.Get("server-head")
if !utils.IsObjectIDValid(serverHead) {
msg := "Invalid server-head parameter."
appErr := &appError{nil, msg, http.StatusBadRequest}
resChan <- &calResult{"", appErr}
return nil
}
clientHead := queries.Get("client-head")
if clientHead != "" && !utils.IsObjectIDValid(clientHead) {
msg := "Invalid client-head parameter."
appErr := &appError{nil, msg, http.StatusBadRequest}
resChan <- &calResult{"", appErr}
return nil
}
dirOnlyArg := queries.Get("dir-only")
var dirOnly bool
if dirOnlyArg != "" {
dirOnly = true
}
vars := mux.Vars(r)
repoID := vars["repoid"]
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
resChan <- &calResult{user, appErr}
return nil
}
appErr = checkPermission(repoID, user, "download", false)
if appErr != nil {
resChan <- &calResult{user, appErr}
return nil
}
repo := repomgr.Get(repoID)
if repo == nil {
err := fmt.Errorf("Failed to find repo %.8s", repoID)
appErr := &appError{err, "", http.StatusInternalServerError}
resChan <- &calResult{user, appErr}
return nil
}
ret, err := calculateSendObjectList(r.Context(), repo, serverHead, clientHead, dirOnly)
if err != nil {
if !errors.Is(err, context.Canceled) {
err := fmt.Errorf("Failed to get fs id list: %w", err)
appErr := &appError{err, "", http.StatusInternalServerError}
resChan <- &calResult{user, appErr}
return nil
}
appErr := &appError{nil, "", http.StatusInternalServerError}
resChan <- &calResult{user, appErr}
return nil
}
var objList []byte
if ret != nil {
objList, err = json.Marshal(ret)
if err != nil {
appErr := &appError{err, "", http.StatusInternalServerError}
resChan <- &calResult{user, appErr}
return nil
}
} else {
// when get obj list is nil, return []
objList = []byte{'[', ']'}
}
rsp.Header().Set("Content-Length", strconv.Itoa(len(objList)))
rsp.WriteHeader(http.StatusOK)
rsp.Write(objList)
resChan <- &calResult{user, nil}
return nil
}
func permissionCheckCB(rsp http.ResponseWriter, r *http.Request) *appError {
queries := r.URL.Query()
op := queries.Get("op")
if op != "download" && op != "upload" {
msg := "op is invalid"
return &appError{nil, msg, http.StatusBadRequest}
}
clientID := queries.Get("client_id")
if clientID != "" && len(clientID) != 40 {
msg := "client_id is invalid"
return &appError{nil, msg, http.StatusBadRequest}
}
clientVer := queries.Get("client_ver")
if clientVer != "" {
status := validateClientVer(clientVer)
if status != http.StatusOK {
msg := "client_ver is invalid"
return &appError{nil, msg, status}
}
}
clientName := queries.Get("client_name")
if clientName != "" {
clientName = html.UnescapeString(clientName)
}
vars := mux.Vars(r)
repoID := vars["repoid"]
repo := repomgr.GetEx(repoID)
if repo == nil {
msg := "repo was deleted"
return &appError{nil, msg, seafHTTPResRepoDeleted}
}
if repo.IsCorrupted {
msg := "repo was corrupted"
return &appError{nil, msg, seafHTTPResRepoCorrupted}
}
user, err := validateToken(r, repoID, true)
if err != nil {
return err
}
err = checkPermission(repoID, user, op, true)
if err != nil {
return err
}
ip := getClientIPAddr(r)
if ip == "" {
token := r.Header.Get("Seafile-Repo-Token")
err := fmt.Errorf("%s failed to get client ip", token)
return &appError{err, "", http.StatusInternalServerError}
}
if op == "download" {
onRepoOper("repo-download-sync", repoID, user, ip, clientName)
}
if clientID != "" && clientName != "" {
token := r.Header.Get("Seafile-Repo-Token")
exists, err := repomgr.TokenPeerInfoExists(token)
if err != nil {
err := fmt.Errorf("Failed to check whether token %s peer info exist: %v", token, err)
return &appError{err, "", http.StatusInternalServerError}
}
if !exists {
if err := repomgr.AddTokenPeerInfo(token, clientID, ip, clientName, clientVer, int64(time.Now().Unix())); err != nil {
err := fmt.Errorf("Failed to add token peer info: %v", err)
return &appError{err, "", http.StatusInternalServerError}
}
} else {
if err := repomgr.UpdateTokenPeerInfo(token, clientID, clientVer, int64(time.Now().Unix())); err != nil {
err := fmt.Errorf("Failed to update token peer info: %v", err)
return &appError{err, "", http.StatusInternalServerError}
}
}
}
return nil
}
func getBlockMapCB(rsp http.ResponseWriter, r *http.Request) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
fileID := vars["id"]
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
return appErr
}
appErr = checkPermission(repoID, user, "download", false)
if appErr != nil {
return appErr
}
storeID, err := getRepoStoreID(repoID)
if err != nil {
err := fmt.Errorf("Failed to get repo store id by repo id %s: %v", repoID, err)
return &appError{err, "", http.StatusInternalServerError}
}
seafile, err := fsmgr.GetSeafile(storeID, fileID)
if err != nil {
msg := fmt.Sprintf("Failed to get seafile object by file id %s: %v", fileID, err)
return &appError{nil, msg, http.StatusNotFound}
}
var blockSizes []int64
for _, blockID := range seafile.BlkIDs {
blockSize, err := blockmgr.Stat(storeID, blockID)
if err != nil {
err := fmt.Errorf("Failed to find block %s/%s", storeID, blockID)
return &appError{err, "", http.StatusInternalServerError}
}
blockSizes = append(blockSizes, blockSize)
}
var data []byte
if blockSizes != nil {
data, err = json.Marshal(blockSizes)
if err != nil {
err := fmt.Errorf("Failed to marshal json: %v", err)
return &appError{err, "", http.StatusInternalServerError}
}
} else {
data = []byte{'[', ']'}
}
rsp.Header().Set("Content-Length", strconv.Itoa(len(data)))
rsp.WriteHeader(http.StatusOK)
rsp.Write(data)
return nil
}
func getAccessibleRepoListCB(rsp http.ResponseWriter, r *http.Request) *appError {
queries := r.URL.Query()
repoID := queries.Get("repo_id")
if repoID == "" || !utils.IsValidUUID(repoID) {
msg := "Invalid repo id."
return &appError{nil, msg, http.StatusBadRequest}
}
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
return appErr
}
obtainedRepos := make(map[string]string)
repos, err := share.GetReposByOwner(user)
if err != nil {
err := fmt.Errorf("Failed to get repos by owner %s: %v", user, err)
return &appError{err, "", http.StatusInternalServerError}
}
var repoObjects []*share.SharedRepo
for _, repo := range repos {
if _, ok := obtainedRepos[repo.ID]; !ok {
obtainedRepos[repo.ID] = repo.ID
}
repo.Permission = "rw"
repo.Type = "repo"
repo.Owner = user
repoObjects = append(repoObjects, repo)
}
repos, err = share.ListShareRepos(user, "to_email")
if err != nil {
err := fmt.Errorf("Failed to get share repos by user %s: %v", user, err)
return &appError{err, "", http.StatusInternalServerError}
}
for _, sRepo := range repos {
if _, ok := obtainedRepos[sRepo.ID]; ok {
continue
}
sRepo.Type = "srepo"
sRepo.Owner = strings.ToLower(sRepo.Owner)
repoObjects = append(repoObjects, sRepo)
}
repos, err = share.GetGroupReposByUser(user, -1)
if err != nil {
err := fmt.Errorf("Failed to get group repos by user %s: %v", user, err)
return &appError{err, "", http.StatusInternalServerError}
}
reposTable := filterGroupRepos(repos)
for _, gRepo := range reposTable {
if _, ok := obtainedRepos[gRepo.ID]; ok {
continue
}
gRepo.Type = "grepo"
gRepo.Owner = strings.ToLower(gRepo.Owner)
repoObjects = append(repoObjects, gRepo)
}
repos, err = share.ListInnerPubRepos()
if err != nil {
err := fmt.Errorf("Failed to get inner public repos: %v", err)
return &appError{err, "", http.StatusInternalServerError}
}
for _, sRepo := range repos {
if _, ok := obtainedRepos[sRepo.ID]; ok {
continue
}
sRepo.Type = "grepo"
sRepo.Owner = "Organization"
repoObjects = append(repoObjects, sRepo)
}
var data []byte
if repoObjects != nil {
data, err = json.Marshal(repoObjects)
if err != nil {
err := fmt.Errorf("Failed to marshal json: %v", err)
return &appError{err, "", http.StatusInternalServerError}
}
} else {
data = []byte{'[', ']'}
}
rsp.Header().Set("Content-Length", strconv.Itoa(len(data)))
rsp.WriteHeader(http.StatusOK)
rsp.Write(data)
return nil
}
func filterGroupRepos(repos []*share.SharedRepo) map[string]*share.SharedRepo {
table := make(map[string]*share.SharedRepo)
for _, repo := range repos {
if repoPrev, ok := table[repo.ID]; ok {
if repo.Permission == "rw" && repoPrev.Permission == "r" {
table[repo.ID] = repo
}
} else {
table[repo.ID] = repo
}
}
return table
}
func recvFSCB(rsp http.ResponseWriter, r *http.Request) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
return appErr
}
appErr = checkPermission(repoID, user, "upload", false)
if appErr != nil {
return appErr
}
storeID, err := getRepoStoreID(repoID)
if err != nil {
err := fmt.Errorf("Failed to get repo store id by repo id %s: %v", repoID, err)
return &appError{err, "", http.StatusInternalServerError}
}
fsBuf, err := ioutil.ReadAll(r.Body)
if err != nil {
return &appError{nil, err.Error(), http.StatusBadRequest}
}
for len(fsBuf) > 44 {
objID := string(fsBuf[:40])
if !utils.IsObjectIDValid(objID) {
msg := fmt.Sprintf("Fs obj id %s is invalid", objID)
return &appError{nil, msg, http.StatusBadRequest}
}
var objSize uint32
sizeBuffer := bytes.NewBuffer(fsBuf[40:44])
if err := binary.Read(sizeBuffer, binary.BigEndian, &objSize); err != nil {
msg := fmt.Sprintf("Failed to read fs obj size: %v", err)
return &appError{nil, msg, http.StatusBadRequest}
}
if len(fsBuf) < int(44+objSize) {
msg := "Request body size invalid"
return &appError{nil, msg, http.StatusBadRequest}
}
objBuffer := bytes.NewBuffer(fsBuf[44 : 44+objSize])
if err := fsmgr.WriteRaw(storeID, objID, objBuffer); err != nil {
err := fmt.Errorf("Failed to write fs obj %s:%s : %v", storeID, objID, err)
return &appError{err, "", http.StatusInternalServerError}
}
fsBuf = fsBuf[44+objSize:]
}
if len(fsBuf) == 0 {
rsp.WriteHeader(http.StatusOK)
return nil
}
msg := "Request body size invalid"
return &appError{nil, msg, http.StatusBadRequest}
}
func checkFSCB(rsp http.ResponseWriter, r *http.Request) *appError {
return postCheckExistCB(rsp, r, checkFSExist)
}
func checkBlockCB(rsp http.ResponseWriter, r *http.Request) *appError {
return postCheckExistCB(rsp, r, checkBlockExist)
}
func postCheckExistCB(rsp http.ResponseWriter, r *http.Request, existType checkExistType) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
return appErr
}
appErr = checkPermission(repoID, user, "download", false)
if appErr != nil {
return appErr
}
storeID, err := getRepoStoreID(repoID)
if err != nil {
err := fmt.Errorf("Failed to get repo store id by repo id %s: %v", repoID, err)
return &appError{err, "", http.StatusInternalServerError}
}
var objIDList []string
if err := json.NewDecoder(r.Body).Decode(&objIDList); err != nil {
return &appError{nil, err.Error(), http.StatusBadRequest}
}
var neededObjs []string
var ret bool
for i := 0; i < len(objIDList); i++ {
if !utils.IsObjectIDValid(objIDList[i]) {
continue
}
if existType == checkFSExist {
ret, _ = fsmgr.Exists(storeID, objIDList[i])
} else if existType == checkBlockExist {
ret = blockmgr.Exists(storeID, objIDList[i])
}
if !ret {
neededObjs = append(neededObjs, objIDList[i])
}
}
var data []byte
if neededObjs != nil {
data, err = json.Marshal(neededObjs)
if err != nil {
err := fmt.Errorf("Failed to marshal json: %v", err)
return &appError{err, "", http.StatusInternalServerError}
}
} else {
data = []byte{'[', ']'}
}
rsp.Header().Set("Content-Length", strconv.Itoa(len(data)))
rsp.WriteHeader(http.StatusOK)
rsp.Write(data)
return nil
}
func packFSCB(rsp http.ResponseWriter, r *http.Request) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
return appErr
}
appErr = checkPermission(repoID, user, "download", false)
if appErr != nil {
return appErr
}
storeID, err := getRepoStoreID(repoID)
if err != nil {
err := fmt.Errorf("Failed to get repo store id by repo id %s: %v", repoID, err)
return &appError{err, "", http.StatusInternalServerError}
}
var fsIDList []string
if err := json.NewDecoder(r.Body).Decode(&fsIDList); err != nil {
return &appError{nil, err.Error(), http.StatusBadRequest}
}
var totalSize int
var data bytes.Buffer
for i := 0; i < len(fsIDList); i++ {
if !utils.IsObjectIDValid(fsIDList[i]) {
msg := fmt.Sprintf("Invalid fs id %s", fsIDList[i])
return &appError{nil, msg, http.StatusBadRequest}
}
data.WriteString(fsIDList[i])
var tmp bytes.Buffer
if err := fsmgr.ReadRaw(storeID, fsIDList[i], &tmp); err != nil {
err := fmt.Errorf("Failed to read fs %s:%s: %v", storeID, fsIDList[i], err)
return &appError{err, "", http.StatusInternalServerError}
}
tmpLen := make([]byte, 4)
binary.BigEndian.PutUint32(tmpLen, uint32(tmp.Len()))
data.Write(tmpLen)
data.Write(tmp.Bytes())
totalSize += tmp.Len()
if totalSize >= maxObjectPackSize {
break
}
}
rsp.Header().Set("Content-Length", strconv.Itoa(data.Len()))
rsp.WriteHeader(http.StatusOK)
rsp.Write(data.Bytes())
return nil
}
func headCommitsMultiCB(rsp http.ResponseWriter, r *http.Request) *appError {
var repoIDList []string
if err := json.NewDecoder(r.Body).Decode(&repoIDList); err != nil {
return &appError{err, "", http.StatusBadRequest}
}
if len(repoIDList) == 0 {
return &appError{nil, "", http.StatusBadRequest}
}
var repoIDs strings.Builder
for i := 0; i < len(repoIDList); i++ {
if !utils.IsValidUUID(repoIDList[i]) {
return &appError{nil, "", http.StatusBadRequest}
}
if i == 0 {
repoIDs.WriteString(fmt.Sprintf("'%s'", repoIDList[i]))
} else {
repoIDs.WriteString(fmt.Sprintf(",'%s'", repoIDList[i]))
}
}
mode := ""
if strings.EqualFold(dbType, "mysql") {
mode = "LOCK IN SHARE MODE"
}
sqlStr := fmt.Sprintf(
"SELECT repo_id, commit_id FROM Branch WHERE name='master' AND "+
"repo_id IN (%s) %s",
repoIDs.String(), mode)
rows, err := seafileDB.Query(sqlStr)
if err != nil {
err := fmt.Errorf("Failed to get commit id: %v", err)
return &appError{err, "", http.StatusInternalServerError}
}
defer rows.Close()
commitIDMap := make(map[string]string)
var repoID string
var commitID string
for rows.Next() {
if err := rows.Scan(&repoID, &commitID); err == nil {
commitIDMap[repoID] = commitID
}
}
if err := rows.Err(); err != nil {
err := fmt.Errorf("Failed to get commit id: %v", err)
return &appError{err, "", http.StatusInternalServerError}
}
data, err := json.Marshal(commitIDMap)
if err != nil {
err := fmt.Errorf("Failed to marshal json: %v", err)
return &appError{err, "", http.StatusInternalServerError}
}
rsp.Header().Set("Content-Length", strconv.Itoa(len(data)))
rsp.WriteHeader(http.StatusOK)
rsp.Write(data)
return nil
}
func getCheckQuotaCB(rsp http.ResponseWriter, r *http.Request) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
if _, err := validateToken(r, repoID, false); err != nil {
return err
}
queries := r.URL.Query()
delta := queries.Get("delta")
if delta == "" {
msg := "Invalid delta parameter"
return &appError{nil, msg, http.StatusBadRequest}
}
deltaNum, err := strconv.ParseInt(delta, 10, 64)
if err != nil {
msg := "Invalid delta parameter"
return &appError{nil, msg, http.StatusBadRequest}
}
ret, err := checkQuota(repoID, deltaNum)
if err != nil {
msg := "Internal error.\n"
err := fmt.Errorf("failed to check quota: %v", err)
return &appError{err, msg, http.StatusInternalServerError}
}
if ret == 1 {
msg := "Out of quota.\n"
return &appError{nil, msg, seafHTTPResNoQuota}
}
return nil
}
type MyClaims struct {
Exp int64
RepoID string `json:"repo_id"`
UserName string `json:"username"`
}
func (*MyClaims) Valid() error {
return nil
}
func getJWTTokenCB(rsp http.ResponseWriter, r *http.Request) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
if !option.EnableNotification {
err := fmt.Errorf("notification server is not enabled")
return &appError{err, "", http.StatusInternalServerError}
}
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
return appErr
}
tokenString, err := genJWTToken(repoID, user)
if err != nil {
return &appError{err, "", http.StatusInternalServerError}
}
data := fmt.Sprintf("{\"jwt_token\":\"%s\"}", tokenString)
rsp.Write([]byte(data))
return nil
}
func genJWTToken(repoID, user string) (string, error) {
claims := MyClaims{
time.Now().Add(time.Hour * 72).Unix(),
repoID,
user,
}
token := jwt.NewWithClaims(jwt.GetSigningMethod("HS256"), &claims)
tokenString, err := token.SignedString([]byte(option.PrivateKey))
if err != nil {
err := fmt.Errorf("failed to gen jwt token for repo %s", repoID)
return "", err
}
return tokenString, nil
}
func isValidUUID(u string) bool {
_, err := uuid.Parse(u)
return err == nil
}
func getFsObjIDCB(rsp http.ResponseWriter, r *http.Request) *appError {
recvChan := make(chan *calResult)
calFsIdPool.AddTask(recvChan, rsp, r)
result := <-recvChan
return result.err
}
func headCommitOperCB(rsp http.ResponseWriter, r *http.Request) *appError {
if r.Method == http.MethodGet {
return getHeadCommit(rsp, r)
} else if r.Method == http.MethodPut {
return putUpdateBranchCB(rsp, r)
}
return &appError{nil, "", http.StatusBadRequest}
}
func commitOperCB(rsp http.ResponseWriter, r *http.Request) *appError {
if r.Method == http.MethodGet {
return getCommitInfo(rsp, r)
} else if r.Method == http.MethodPut {
return putCommitCB(rsp, r)
}
return &appError{nil, "", http.StatusBadRequest}
}
func blockOperCB(rsp http.ResponseWriter, r *http.Request) *appError {
if r.Method == http.MethodGet {
return getBlockInfo(rsp, r)
} else if r.Method == http.MethodPut {
return putSendBlockCB(rsp, r)
}
return &appError{nil, "", http.StatusBadRequest}
}
func putSendBlockCB(rsp http.ResponseWriter, r *http.Request) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
blockID := vars["id"]
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
return appErr
}
appErr = checkPermission(repoID, user, "upload", false)
if appErr != nil {
return appErr
}
storeID, err := getRepoStoreID(repoID)
if err != nil {
err := fmt.Errorf("Failed to get repo store id by repo id %s: %v", repoID, err)
return &appError{err, "", http.StatusInternalServerError}
}
if err := blockmgr.Write(storeID, blockID, r.Body); err != nil {
err := fmt.Errorf("Failed to close block %.8s:%s", storeID, blockID)
return &appError{err, "", http.StatusInternalServerError}
}
sendStatisticMsg(storeID, user, "sync-file-upload", uint64(r.ContentLength))
return nil
}
func getBlockInfo(rsp http.ResponseWriter, r *http.Request) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
blockID := vars["id"]
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
return appErr
}
appErr = checkPermission(repoID, user, "download", false)
if appErr != nil {
return appErr
}
storeID, err := getRepoStoreID(repoID)
if err != nil {
err := fmt.Errorf("Failed to get repo store id by repo id %s: %v", repoID, err)
return &appError{err, "", http.StatusInternalServerError}
}
blockSize, err := blockmgr.Stat(storeID, blockID)
if err != nil {
return &appError{err, "", http.StatusInternalServerError}
}
if blockSize <= 0 {
err := fmt.Errorf("block %.8s:%s size invalid", storeID, blockID)
return &appError{err, "", http.StatusInternalServerError}
}
blockLen := fmt.Sprintf("%d", blockSize)
rsp.Header().Set("Content-Length", blockLen)
if err := blockmgr.Read(storeID, blockID, rsp); err != nil {
if !isNetworkErr(err) {
log.Printf("failed to read block %s: %v", blockID, err)
}
return nil
}
sendStatisticMsg(storeID, user, "sync-file-download", uint64(blockSize))
return nil
}
func getRepoStoreID(repoID string) (string, error) {
var storeID string
if value, ok := virtualRepoInfoCache.Load(repoID); ok {
if info, ok := value.(*virtualRepoInfo); ok {
if info.storeID != "" {
storeID = info.storeID
} else {
storeID = repoID
}
info.expireTime = time.Now().Unix() + virtualRepoExpireTime
}
}
if storeID != "" {
return storeID, nil
}
var vInfo virtualRepoInfo
var rID, originRepoID sql.NullString
sqlStr := "SELECT repo_id, origin_repo FROM VirtualRepo where repo_id = ?"
row := seafileDB.QueryRow(sqlStr, repoID)
if err := row.Scan(&rID, &originRepoID); err != nil {
if err == sql.ErrNoRows {
vInfo.storeID = repoID
vInfo.expireTime = time.Now().Unix() + virtualRepoExpireTime
virtualRepoInfoCache.Store(repoID, &vInfo)
return repoID, nil
}
return "", err
}
if !rID.Valid || !originRepoID.Valid {
return "", nil
}
vInfo.storeID = originRepoID.String
vInfo.expireTime = time.Now().Unix() + virtualRepoExpireTime
virtualRepoInfoCache.Store(repoID, &vInfo)
return originRepoID.String, nil
}
func sendStatisticMsg(repoID, user, operation string, bytes uint64) {
rData := &statusEventData{operation, user, repoID, bytes}
publishStatusEvent(rData)
}
func publishStatusEvent(rData *statusEventData) {
buf := fmt.Sprintf("%s\t%s\t%s\t%d",
rData.eType, rData.user,
rData.repoID, rData.bytes)
if _, err := rpcclient.Call("publish_event", seafileServerChannelStats, buf); err != nil {
log.Printf("Failed to publish event: %v", err)
}
}
func putCommitCB(rsp http.ResponseWriter, r *http.Request) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
commitID := vars["id"]
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
return appErr
}
appErr = checkPermission(repoID, user, "upload", true)
if appErr != nil {
return appErr
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
return &appError{nil, err.Error(), http.StatusBadRequest}
}
commit := new(commitmgr.Commit)
if err := commit.FromData(data); err != nil {
return &appError{nil, err.Error(), http.StatusBadRequest}
}
if commit.RepoID != repoID {
msg := "The repo id in commit does not match current repo id"
return &appError{nil, msg, http.StatusBadRequest}
}
if err := commitmgr.Save(commit); err != nil {
err := fmt.Errorf("Failed to add commit %s: %v", commitID, err)
return &appError{err, "", http.StatusInternalServerError}
}
return nil
}
func getCommitInfo(rsp http.ResponseWriter, r *http.Request) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
commitID := vars["id"]
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
return appErr
}
appErr = checkPermission(repoID, user, "download", false)
if appErr != nil {
return appErr
}
if exists, _ := commitmgr.Exists(repoID, commitID); !exists {
log.Printf("%s:%s is missing", repoID, commitID)
return &appError{nil, "", http.StatusNotFound}
}
var data bytes.Buffer
err := commitmgr.ReadRaw(repoID, commitID, &data)
if err != nil {
err := fmt.Errorf("Failed to read commit %s:%s: %v", repoID, commitID, err)
return &appError{err, "", http.StatusInternalServerError}
}
dataLen := strconv.Itoa(data.Len())
rsp.Header().Set("Content-Length", dataLen)
rsp.WriteHeader(http.StatusOK)
rsp.Write(data.Bytes())
return nil
}
func putUpdateBranchCB(rsp http.ResponseWriter, r *http.Request) *appError {
queries := r.URL.Query()
newCommitID := queries.Get("head")
if newCommitID == "" || !utils.IsObjectIDValid(newCommitID) {
msg := fmt.Sprintf("commit id %s is invalid", newCommitID)
return &appError{nil, msg, http.StatusBadRequest}
}
vars := mux.Vars(r)
repoID := vars["repoid"]
user, appErr := validateToken(r, repoID, false)
if appErr != nil {
return appErr
}
appErr = checkPermission(repoID, user, "upload", false)
if appErr != nil && appErr.Code == http.StatusForbidden {
return appErr
}
repo := repomgr.Get(repoID)
if repo == nil {
err := fmt.Errorf("Repo %s is missing or corrupted", repoID)
return &appError{err, "", http.StatusInternalServerError}
}
newCommit, err := commitmgr.Load(repoID, newCommitID)
if err != nil {
err := fmt.Errorf("Failed to get commit %s for repo %s", newCommitID, repoID)
return &appError{err, "", http.StatusInternalServerError}
}
base, err := commitmgr.Load(repoID, newCommit.ParentID.String)
if err != nil {
err := fmt.Errorf("Failed to get commit %s for repo %s", newCommit.ParentID.String, repoID)
return &appError{err, "", http.StatusInternalServerError}
}
ret, err := checkQuota(repoID, 0)
if err != nil {
err := fmt.Errorf("Failed to check quota: %v", err)
return &appError{err, "", http.StatusInternalServerError}
}
if ret == 1 {
msg := "Out of quota.\n"
return &appError{nil, msg, seafHTTPResNoQuota}
}
if err := fastForwardOrMerge(user, repo, base, newCommit); err != nil {
err := fmt.Errorf("Fast forward merge for repo %s is failed: %v", repoID, err)
return &appError{err, "", http.StatusInternalServerError}
}
go mergeVirtualRepoPool.AddTask(repoID, "")
go updateSizePool.AddTask(repoID)
rsp.WriteHeader(http.StatusOK)
return nil
}
func getHeadCommit(rsp http.ResponseWriter, r *http.Request) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
sqlStr := "SELECT 1 FROM Repo WHERE repo_id=?"
var exists bool
row := seafileDB.QueryRow(sqlStr, repoID)
if err := row.Scan(&exists); err != nil {
if err != sql.ErrNoRows {
log.Printf("DB error when check repo %s existence: %v", repoID, err)
msg := `{"is_corrupted": 1}`
rsp.WriteHeader(http.StatusOK)
rsp.Write([]byte(msg))
return nil
}
}
if !exists {
return &appError{nil, "", seafHTTPResRepoDeleted}
}
if _, err := validateToken(r, repoID, false); err != nil {
return err
}
var commitID string
sqlStr = "SELECT commit_id FROM Branch WHERE name='master' AND repo_id=?"
row = seafileDB.QueryRow(sqlStr, repoID)
if err := row.Scan(&commitID); err != nil {
if err != sql.ErrNoRows {
log.Printf("DB error when get branch master: %v", err)
msg := `{"is_corrupted": 1}`
rsp.WriteHeader(http.StatusOK)
rsp.Write([]byte(msg))
return nil
}
}
if commitID == "" {
return &appError{nil, "", http.StatusBadRequest}
}
msg := fmt.Sprintf("{\"is_corrupted\": 0, \"head_commit_id\": \"%s\"}", commitID)
rsp.WriteHeader(http.StatusOK)
rsp.Write([]byte(msg))
return nil
}
func checkPermission(repoID, user, op string, skipCache bool) *appError {
var info *permInfo
if !skipCache {
if value, ok := permCache.Load(fmt.Sprintf("%s:%s:%s", repoID, user, op)); ok {
info = value.(*permInfo)
}
}
if info != nil {
return nil
}
permCache.Delete(fmt.Sprintf("%s:%s:%s", repoID, user, op))
if op == "upload" {
status, err := repomgr.GetRepoStatus(repoID)
if err != nil {
msg := fmt.Sprintf("Failed to get repo status by repo id %s: %v", repoID, err)
return &appError{nil, msg, http.StatusForbidden}
}
if status != repomgr.RepoStatusNormal && status != -1 {
return &appError{nil, "", http.StatusForbidden}
}
}
perm := share.CheckPerm(repoID, user)
if perm != "" {
if perm == "r" && op == "upload" {
return &appError{nil, "", http.StatusForbidden}
}
info = new(permInfo)
info.perm = perm
info.expireTime = time.Now().Unix() + permExpireTime
permCache.Store(fmt.Sprintf("%s:%s:%s", repoID, user, op), info)
return nil
}
return &appError{nil, "", http.StatusForbidden}
}
func validateToken(r *http.Request, repoID string, skipCache bool) (string, *appError) {
token := r.Header.Get("Seafile-Repo-Token")
if token == "" {
msg := "token is null"
return "", &appError{nil, msg, http.StatusBadRequest}
}
if value, ok := tokenCache.Load(token); ok {
if info, ok := value.(*tokenInfo); ok {
if info.repoID != repoID {
msg := "Invalid token"
return "", &appError{nil, msg, http.StatusForbidden}
}
return info.email, nil
}
}
email, err := repomgr.GetEmailByToken(repoID, token)
if err != nil {
log.Printf("Failed to get email by token %s: %v", token, err)
tokenCache.Delete(token)
return email, &appError{err, "", http.StatusInternalServerError}
}
if email == "" {
msg := fmt.Sprintf("Failed to get email by token %s", token)
return email, &appError{nil, msg, http.StatusForbidden}
}
info := new(tokenInfo)
info.email = email
info.expireTime = time.Now().Unix() + tokenExpireTime
info.repoID = repoID
tokenCache.Store(token, info)
return email, nil
}
func validateClientVer(clientVer string) int {
versions := strings.Split(clientVer, ".")
if len(versions) != 3 {
return http.StatusBadRequest
}
if _, err := strconv.Atoi(versions[0]); err != nil {
return http.StatusBadRequest
}
if _, err := strconv.Atoi(versions[1]); err != nil {
return http.StatusBadRequest
}
if _, err := strconv.Atoi(versions[2]); err != nil {
return http.StatusBadRequest
}
return http.StatusOK
}
func getClientIPAddr(r *http.Request) string {
xForwardedFor := r.Header.Get("X-Forwarded-For")
addr := strings.TrimSpace(strings.Split(xForwardedFor, ",")[0])
ip := net.ParseIP(addr)
if ip != nil {
return ip.String()
}
addr = strings.TrimSpace(r.Header.Get("X-Real-Ip"))
ip = net.ParseIP(addr)
if ip != nil {
return ip.String()
}
if addr, _, err := net.SplitHostPort(strings.TrimSpace(r.RemoteAddr)); err == nil {
ip = net.ParseIP(addr)
if ip != nil {
return ip.String()
}
}
return ""
}
func onRepoOper(eType, repoID, user, ip, clientName string) {
rData := new(repoEventData)
vInfo, err := repomgr.GetVirtualRepoInfo(repoID)
if err != nil {
log.Printf("Failed to get virtual repo info by repo id %s: %v", repoID, err)
return
}
if vInfo != nil {
rData.repoID = vInfo.OriginRepoID
rData.path = vInfo.Path
} else {
rData.repoID = repoID
}
rData.eType = eType
rData.user = user
rData.ip = ip
rData.clientName = clientName
publishRepoEvent(rData)
}
func publishRepoEvent(rData *repoEventData) {
if rData.path == "" {
rData.path = "/"
}
buf := fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%s",
rData.eType, rData.user, rData.ip,
rData.clientName, rData.repoID, rData.path)
if _, err := rpcclient.Call("publish_event", seafileServerChannelEvent, buf); err != nil {
log.Printf("Failed to publish event: %v", err)
}
}
func publishUpdateEvent(repoID string, commitID string) {
buf := fmt.Sprintf("repo-update\t%s\t%s", repoID, commitID)
if _, err := rpcclient.Call("publish_event", seafileServerChannelEvent, buf); err != nil {
log.Printf("Failed to publish event: %v", err)
}
}
func removeSyncAPIExpireCache() {
deleteTokens := func(key interface{}, value interface{}) bool {
if info, ok := value.(*tokenInfo); ok {
if info.expireTime <= time.Now().Unix() {
tokenCache.Delete(key)
}
}
return true
}
deletePerms := func(key interface{}, value interface{}) bool {
if info, ok := value.(*permInfo); ok {
if info.expireTime <= time.Now().Unix() {
permCache.Delete(key)
}
}
return true
}
deleteVirtualRepoInfo := func(key interface{}, value interface{}) bool {
if info, ok := value.(*virtualRepoInfo); ok {
if info.expireTime <= time.Now().Unix() {
virtualRepoInfoCache.Delete(key)
}
}
return true
}
tokenCache.Range(deleteTokens)
permCache.Range(deletePerms)
virtualRepoInfoCache.Range(deleteVirtualRepoInfo)
}
type collectFsInfo struct {
startTime int64
isTimeout bool
results []interface{}
}
var ErrTimeout = fmt.Errorf("get fs id list timeout")
func calculateSendObjectList(ctx context.Context, repo *repomgr.Repo, serverHead string, clientHead string, dirOnly bool) ([]interface{}, error) {
masterHead, err := commitmgr.Load(repo.ID, serverHead)
if err != nil {
err := fmt.Errorf("Failed to load server head commit %s:%s: %v", repo.ID, serverHead, err)
return nil, err
}
var remoteHead *commitmgr.Commit
remoteHeadRoot := emptySHA1
if clientHead != "" {
remoteHead, err = commitmgr.Load(repo.ID, clientHead)
if err != nil {
err := fmt.Errorf("Failed to load remote head commit %s:%s: %v", repo.ID, clientHead, err)
return nil, err
}
remoteHeadRoot = remoteHead.RootID
}
info := new(collectFsInfo)
info.startTime = time.Now().Unix()
if remoteHeadRoot != masterHead.RootID && masterHead.RootID != emptySHA1 {
info.results = append(info.results, masterHead.RootID)
}
var opt *diff.DiffOptions
if !dirOnly {
opt = &diff.DiffOptions{
FileCB: collectFileIDs,
DirCB: collectDirIDs,
Ctx: ctx,
RepoID: repo.StoreID}
opt.Data = info
} else {
opt = &diff.DiffOptions{
FileCB: collectFileIDsNOp,
DirCB: collectDirIDs,
Ctx: ctx,
RepoID: repo.StoreID}
opt.Data = info
}
trees := []string{masterHead.RootID, remoteHeadRoot}
if err := diff.DiffTrees(trees, opt); err != nil {
if info.isTimeout {
return nil, ErrTimeout
}
return nil, err
}
return info.results, nil
}
func collectFileIDs(ctx context.Context, baseDir string, files []*fsmgr.SeafDirent, data interface{}) error {
select {
case <-ctx.Done():
return context.Canceled
default:
}
file1 := files[0]
file2 := files[1]
info, ok := data.(*collectFsInfo)
if !ok {
err := fmt.Errorf("failed to assert results")
return err
}
if file1 != nil &&
(file2 == nil || file1.ID != file2.ID) &&
file1.ID != emptySHA1 {
info.results = append(info.results, file1.ID)
}
return nil
}
func collectFileIDsNOp(ctx context.Context, baseDir string, files []*fsmgr.SeafDirent, data interface{}) error {
return nil
}
func collectDirIDs(ctx context.Context, baseDir string, dirs []*fsmgr.SeafDirent, data interface{}, recurse *bool) error {
select {
case <-ctx.Done():
return context.Canceled
default:
}
info, ok := data.(*collectFsInfo)
if !ok {
err := fmt.Errorf("failed to assert fs info")
return err
}
dir1 := dirs[0]
dir2 := dirs[1]
if dir1 != nil &&
(dir2 == nil || dir1.ID != dir2.ID) &&
dir1.ID != emptySHA1 {
info.results = append(info.results, dir1.ID)
}
if option.FsIdListRequestTimeout > 0 {
now := time.Now().Unix()
if now-info.startTime > option.FsIdListRequestTimeout {
info.isTimeout = true
return ErrTimeout
}
}
return nil
}