1
0
mirror of https://github.com/haiwen/seafile-server.git synced 2025-04-27 19:15:07 +00:00

Retry upload when detecting concurrent uploads (#564)

* Retry upload when detecting concurrent uploads

* Check conflict when update branch and set concurrent upload error

Co-authored-by: 杨赫然 <heran.yang@seafile.com>
This commit is contained in:
feiniks 2022-07-15 14:45:36 +08:00 committed by GitHub
parent 3a47cf7ab2
commit 8c9043d2c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 286 additions and 24 deletions

View File

@ -22,5 +22,6 @@
#define POST_FILE_ERR_FILENAME 517
#define POST_FILE_ERR_BLOCK_MISSING 518
#define POST_FILE_ERR_QUOTA_FULL 519
#define SEAF_ERR_CONCURRENT_UPLOAD 520
#endif

View File

@ -171,7 +171,7 @@ start_index_task (gpointer data, gpointer user_data)
id_list = g_list_reverse (id_list);
size_list = g_list_reverse (size_list);
ret = post_files_and_gen_commit (idx_para->filenames,
idx_para->repo,
idx_para->repo->id,
idx_para->user,
idx_para->ret_json ? &ret_json : NULL,
idx_para->replace_existed,

View File

@ -900,7 +900,7 @@ seaf_repo_manager_set_subdir_group_perm_by_path (SeafRepoManager *mgr,
int
post_files_and_gen_commit (GList *filenames,
SeafRepo *repo,
const char *repo_id,
const char *user,
char **ret_json,
int replace_existed,

View File

@ -44,7 +44,7 @@ check_file_count_and_size (SeafRepo *repo, SeafDirent *dent, gint64 total_files,
int
post_files_and_gen_commit (GList *filenames,
SeafRepo *repo,
const char *repo_id,
const char *user,
char **ret_json,
int replace_existed,
@ -455,6 +455,7 @@ gen_new_commit (const char *repo_id,
const char *user,
const char *desc,
char *new_commit_id,
gboolean retry_on_conflict,
GError **error)
{
#define MAX_RETRY_COUNT 3
@ -568,6 +569,11 @@ retry:
repo->head,
current_head->commit_id) < 0)
{
if (!retry_on_conflict) {
g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_CONCURRENT_UPLOAD, "Concurrent upload");
ret = -1;
goto out;
}
seaf_repo_unref (repo);
repo = NULL;
seaf_commit_unref (current_head);
@ -632,6 +638,7 @@ seaf_repo_manager_post_file (SeafRepoManager *mgr,
SeafDirent *new_dent = NULL;
char hex[41];
int ret = 0;
int retry_cnt = 0;
if (g_access (temp_file_path, R_OK) != 0) {
seaf_warning ("[post file] File %s doesn't exist or not readable.\n",
@ -695,6 +702,7 @@ seaf_repo_manager_post_file (SeafRepoManager *mgr,
hex, STD_FILE_MODE, file_name,
(gint64)time(NULL), user, size);
retry:
root_id = do_post_file (repo,
head_commit->root_id, canon_path, new_dent);
if (!root_id) {
@ -708,9 +716,24 @@ seaf_repo_manager_post_file (SeafRepoManager *mgr,
snprintf(buf, SEAF_PATH_MAX, "Added \"%s\"", file_name);
if (gen_new_commit (repo_id, head_commit, root_id,
user, buf, NULL, error) < 0) {
ret = -1;
goto out;
user, buf, NULL, FALSE, error) < 0) {
if (*error == NULL || (*error)->code != SEAF_ERR_CONCURRENT_UPLOAD) {
ret = -1;
goto out;
}
retry_cnt++;
seaf_debug ("[post file] Concurrency upload retry :%d\n", retry_cnt);
/* Sleep random time between 0 and 3 seconds. */
usleep (g_random_int_range(0, 3) * 1000 * 1000);
seaf_repo_unref (repo);
seaf_commit_unref(head_commit);
GET_REPO_OR_FAIL(repo, repo_id);
GET_COMMIT_OR_FAIL(head_commit, repo->id, repo->version, repo->head->commit_id);
goto retry;
}
seaf_repo_manager_merge_virtual_repo (mgr, repo_id, NULL);
@ -1086,7 +1109,7 @@ seaf_repo_manager_post_multi_files (SeafRepoManager *mgr,
size_list = g_list_reverse (size_list);
ret = post_files_and_gen_commit (filenames,
repo,
repo->id,
user,
ret_json,
replace_existed,
@ -1124,7 +1147,7 @@ out:
int
post_files_and_gen_commit (GList *filenames,
SeafRepo *repo,
const char *repo_id,
const char *user,
char **ret_json,
int replace_existed,
@ -1133,14 +1156,18 @@ post_files_and_gen_commit (GList *filenames,
GList *size_list,
GError **error)
{
SeafRepo *repo = NULL;
GList *name_list = NULL;
GString *buf = g_string_new (NULL);
SeafCommit *head_commit = NULL;
char *root_id = NULL;
int ret = 0;
int retry_cnt = 0;
GET_REPO_OR_FAIL(repo, repo_id);
GET_COMMIT_OR_FAIL(head_commit, repo->id, repo->version, repo->head->commit_id);
retry:
/* Add the files to parent dir and commit. */
root_id = do_post_multi_files (repo, head_commit->root_id, canon_path,
filenames, id_list, size_list, user,
@ -1161,9 +1188,24 @@ post_files_and_gen_commit (GList *filenames,
g_string_printf (buf, "Added \"%s\".", (char *)(filenames->data));
if (gen_new_commit (repo->id, head_commit, root_id,
user, buf->str, NULL, error) < 0) {
ret = -1;
goto out;
user, buf->str, NULL, FALSE, error) < 0) {
if (*error == NULL || (*error)->code != SEAF_ERR_CONCURRENT_UPLOAD) {
ret = -1;
goto out;
}
retry_cnt++;
seaf_debug ("[post multi-file] Concurrency upload retry :%d\n", retry_cnt);
/* Sleep random time between 0 and 3 seconds. */
usleep (g_random_int_range(0, 3) * 1000 * 1000);
seaf_repo_unref (repo);
seaf_commit_unref(head_commit);
GET_REPO_OR_FAIL(repo, repo_id);
GET_COMMIT_OR_FAIL(head_commit, repo->id, repo->version, repo->head->commit_id);
goto retry;
}
seaf_repo_manager_merge_virtual_repo (seaf->repo_mgr, repo->id, NULL);
@ -1174,6 +1216,8 @@ post_files_and_gen_commit (GList *filenames,
update_repo_size(repo->id);
out:
if (repo)
seaf_repo_unref (repo);
if (head_commit)
seaf_commit_unref(head_commit);
string_list_free (name_list);
@ -1472,7 +1516,7 @@ seaf_repo_manager_commit_file_blocks (SeafRepoManager *mgr,
*new_id = g_strdup(hex);
snprintf(buf, SEAF_PATH_MAX, "Added \"%s\"", file_name);
if (gen_new_commit (repo_id, head_commit, root_id,
user, buf, NULL, error) < 0)
user, buf, NULL, TRUE, error) < 0)
ret = -1;
out:
@ -1696,7 +1740,7 @@ seaf_repo_manager_del_file (SeafRepoManager *mgr,
}
if (gen_new_commit (repo_id, head_commit, root_id,
user, buf, NULL, error) < 0) {
user, buf, NULL, TRUE, error) < 0) {
ret = -1;
goto out;
}
@ -1833,7 +1877,7 @@ put_dirent_and_commit (SeafRepo *repo,
}
if (gen_new_commit (repo->id, head_commit, root_id,
user, buf, NULL, error) < 0)
user, buf, NULL, TRUE, error) < 0)
ret = -1;
out:
@ -2676,7 +2720,7 @@ move_file_same_repo (const char *repo_id,
}
if (gen_new_commit (repo_id, head_commit, root_id,
user, buf, NULL, error) < 0)
user, buf, NULL, TRUE, error) < 0)
ret = -1;
out:
@ -3416,7 +3460,7 @@ seaf_repo_manager_mkdir_with_parents (SeafRepoManager *mgr,
/* Commit. */
snprintf(buf, SEAF_PATH_MAX, "Added directory \"%s\"", relative_dir_can);
if (gen_new_commit (repo_id, head_commit, root_id,
user, buf, NULL, error) < 0) {
user, buf, NULL, TRUE, error) < 0) {
ret = -1;
g_free (root_id);
goto out;
@ -3497,7 +3541,7 @@ seaf_repo_manager_post_dir (SeafRepoManager *mgr,
/* Commit. */
snprintf(buf, SEAF_PATH_MAX, "Added directory \"%s\"", new_dir_name);
if (gen_new_commit (repo_id, head_commit, root_id,
user, buf, NULL, error) < 0) {
user, buf, NULL, TRUE, error) < 0) {
ret = -1;
goto out;
}
@ -3569,7 +3613,7 @@ seaf_repo_manager_post_empty_file (SeafRepoManager *mgr,
/* Commit. */
snprintf(buf, SEAF_PATH_MAX, "Added \"%s\"", new_file_name);
if (gen_new_commit (repo_id, head_commit, root_id,
user, buf, NULL, error) < 0) {
user, buf, NULL, TRUE, error) < 0) {
ret = -1;
goto out;
}
@ -3762,7 +3806,7 @@ seaf_repo_manager_rename_file (SeafRepoManager *mgr,
}
if (gen_new_commit (repo_id, head_commit, root_id,
user, buf, NULL, error) < 0) {
user, buf, NULL, TRUE, error) < 0) {
ret = -1;
goto out;
}
@ -3998,7 +4042,7 @@ seaf_repo_manager_put_file (SeafRepoManager *mgr,
/* Commit. */
snprintf(buf, SEAF_PATH_MAX, "Modified \"%s\"", file_name);
if (gen_new_commit (repo_id, head_commit, root_id, user, buf, NULL, error) < 0) {
if (gen_new_commit (repo_id, head_commit, root_id, user, buf, NULL, TRUE, error) < 0) {
ret = -1;
goto out;
}
@ -4080,7 +4124,7 @@ seaf_repo_manager_update_dir (SeafRepoManager *mgr,
commit_desc = g_strdup("Auto merge by system");
if (gen_new_commit (repo_id, head_commit, new_dir_id,
user, commit_desc, new_commit_id, error) < 0)
user, commit_desc, new_commit_id, TRUE, error) < 0)
ret = -1;
g_free (commit_desc);
goto out;
@ -4113,7 +4157,7 @@ seaf_repo_manager_update_dir (SeafRepoManager *mgr,
commit_desc = g_strdup("Auto merge by system");
if (gen_new_commit (repo_id, head_commit, root_id,
user, commit_desc, new_commit_id, error) < 0) {
user, commit_desc, new_commit_id, TRUE, error) < 0) {
ret = -1;
g_free (commit_desc);
goto out;
@ -4602,7 +4646,7 @@ seaf_repo_manager_revert_file (SeafRepoManager *mgr,
#endif
snprintf(buf, SEAF_PATH_MAX, "Reverted file \"%s\" to status at %s", filename, time_str);
if (gen_new_commit (repo_id, head_commit, root_id,
user, buf, NULL, error) < 0) {
user, buf, NULL, TRUE, error) < 0) {
ret = -1;
goto out;
}
@ -4811,7 +4855,7 @@ seaf_repo_manager_revert_dir (SeafRepoManager *mgr,
/* Commit. */
snprintf(buf, SEAF_PATH_MAX, "Recovered deleted directory \"%s\"", dirname);
if (gen_new_commit (repo_id, head_commit, root_id,
user, buf, NULL, error) < 0) {
user, buf, NULL, TRUE, error) < 0) {
ret = -1;
goto out;
}

View File

@ -0,0 +1,6 @@
[account]
server = http://192.168.60.132
username = 123456@qq.com
password = 123456
repoid = e63f9fc8-880a-427f-b2c4-42c00538cb94
thread_num = 1000

8
tests/test_upload/go.mod Normal file
View File

@ -0,0 +1,8 @@
module test_upload
go 1.18
require (
github.com/haiwen/seafile-server/fileserver v0.0.0-20220621072834-faf434def97d // indirect
gopkg.in/ini.v1 v1.66.6 // indirect
)

29
tests/test_upload/go.sum Normal file
View File

@ -0,0 +1,29 @@
github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/haiwen/seafile-server/fileserver v0.0.0-20220621072834-faf434def97d h1:7W5BeFzUFCx+xz5pINiuRJesr82pA2Gq0LZeHXBI0jE=
github.com/haiwen/seafile-server/fileserver v0.0.0-20220621072834-faf434def97d/go.mod h1:3r5rRrKrYibzy1quQOR0/yvT+7L+iuAFAwTcggCp6wg=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
gopkg.in/ini.v1 v1.55.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI=
gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=

View File

@ -0,0 +1 @@
go run test_upload.go -c accont.conf -p runtime

View File

@ -0,0 +1,173 @@
package main
import "fmt"
import "io"
import "sync"
import "flag"
import "log"
import "encoding/json"
import "bytes"
import "net/http"
import "mime/multipart"
import "path/filepath"
import "gopkg.in/ini.v1"
import "github.com/haiwen/seafile-server/fileserver/searpc"
type Options struct {
server string
username string
password string
repoID string
threadNum int
}
var confPath string
var rpcPipePath string
var options Options
var rpcclient *searpc.Client
func init() {
flag.StringVar(&confPath, "c", "", "config file path")
flag.StringVar(&rpcPipePath, "p", "", "rpc pipe path")
}
func main() {
flag.Parse()
pipePath := filepath.Join(rpcPipePath, "seafile.sock")
rpcclient = searpc.Init(pipePath, "seafserv-threaded-rpcserver")
config, err := ini.Load(confPath)
if err != nil {
log.Fatalf("Failed to load config file: %v", err)
}
section, err := config.GetSection("account")
if err != nil {
log.Fatal("No account section in config file.")
}
key, err := section.GetKey("server")
if err == nil {
options.server = key.String()
}
key, err = section.GetKey("username")
if err == nil {
options.username = key.String()
}
key, err = section.GetKey("password")
if err == nil {
options.password = key.String()
}
key, err = section.GetKey("repoid")
if err == nil {
options.repoID = key.String()
}
key, err = section.GetKey("thread_num")
if err == nil {
options.threadNum, _ = key.Int()
}
objID := "{\"parent_dir\":\"/\"}"
token, err := rpcclient.Call("seafile_web_get_access_token", options.repoID, objID, "upload", options.username, false)
if err != nil {
log.Fatal("Failed to get web access token\n")
}
accessToken, _ := token.(string)
url := fmt.Sprintf("%s:8082/upload-api/%s", options.server, accessToken)
content := []byte("123456")
var group sync.WaitGroup
for i := 0; i < options.threadNum; i++ {
group.Add(1)
go func(i int) {
values := make(map[string]io.Reader)
values["file"] = bytes.NewReader(content)
values["parent_dir"] = bytes.NewBuffer([]byte("/"))
// values["relative_path"] = bytes.NewBuffer([]byte(relativePath))
values["replace"] = bytes.NewBuffer([]byte("0"))
form, contentType, err := createForm(values, "111.md")
if err != nil {
log.Fatal("Failed to create multipart form: %v", err)
}
headers := make(map[string][]string)
headers["Content-Type"] = []string{contentType}
// headers["Authorization"] = []string{"Token " + accessToken.(string)}
status, body, err := HttpCommon("POST", url, headers, form)
log.Printf("[%d]upload status: %d return body: %s error: %v\n", i, status, string(body), err)
group.Done()
}(i)
}
group.Wait()
}
func createForm(values map[string]io.Reader, name string) (io.Reader, string, error) {
buf := new(bytes.Buffer)
w := multipart.NewWriter(buf)
defer w.Close()
for k, v := range values {
var fw io.Writer
var err error
if k == "file" {
if fw, err = w.CreateFormFile(k, name); err != nil {
return nil, "", err
}
} else {
if fw, err = w.CreateFormField(k); err != nil {
return nil, "", err
}
}
if _, err = io.Copy(fw, v); err != nil {
return nil, "", err
}
}
return buf, w.FormDataContentType(), nil
}
func HttpCommon(method, url string, header map[string][]string, reader io.Reader) (int, []byte, error) {
req, err := http.NewRequest(method, url, reader)
if err != nil {
return -1, nil, err
}
req.Header = header
rsp, err := http.DefaultClient.Do(req)
if err != nil {
return -1, nil, err
}
defer rsp.Body.Close()
if rsp.StatusCode == http.StatusNotFound {
return rsp.StatusCode, nil, fmt.Errorf("url %s not found", url)
}
body, err := io.ReadAll(rsp.Body)
if err != nil {
return rsp.StatusCode, nil, err
}
return rsp.StatusCode, body, nil
}
func getToken() string {
url := fmt.Sprintf("%s:8000/api2/auth-token/", options.server)
header := make(map[string][]string)
header["Content-Type"] = []string{"application/x-www-form-urlencoded"}
data := []byte(fmt.Sprintf("username=%s&password=%s", options.username, options.password))
_, body, err := HttpCommon("POST", url, header, bytes.NewReader(data))
if err != nil {
return ""
}
tokenMap := make(map[string]interface{})
err = json.Unmarshal(body, &tokenMap)
if err != nil {
return ""
}
token, _ := tokenMap["token"].(string)
return token
}