diff --git a/include/seafile-error.h b/include/seafile-error.h index 8cb09ab..8a3e26d 100644 --- a/include/seafile-error.h +++ b/include/seafile-error.h @@ -22,5 +22,6 @@ #define POST_FILE_ERR_FILENAME 517 #define POST_FILE_ERR_BLOCK_MISSING 518 #define POST_FILE_ERR_QUOTA_FULL 519 +#define SEAF_ERR_CONCURRENT_UPLOAD 520 #endif diff --git a/server/index-blocks-mgr.c b/server/index-blocks-mgr.c index 9ee9820..e214678 100644 --- a/server/index-blocks-mgr.c +++ b/server/index-blocks-mgr.c @@ -171,7 +171,7 @@ start_index_task (gpointer data, gpointer user_data) id_list = g_list_reverse (id_list); size_list = g_list_reverse (size_list); ret = post_files_and_gen_commit (idx_para->filenames, - idx_para->repo, + idx_para->repo->id, idx_para->user, idx_para->ret_json ? &ret_json : NULL, idx_para->replace_existed, diff --git a/server/repo-mgr.h b/server/repo-mgr.h index f65b4ab..de79f8f 100644 --- a/server/repo-mgr.h +++ b/server/repo-mgr.h @@ -900,7 +900,7 @@ seaf_repo_manager_set_subdir_group_perm_by_path (SeafRepoManager *mgr, int post_files_and_gen_commit (GList *filenames, - SeafRepo *repo, + const char *repo_id, const char *user, char **ret_json, int replace_existed, diff --git a/server/repo-op.c b/server/repo-op.c index e43eab0..d03dd9c 100644 --- a/server/repo-op.c +++ b/server/repo-op.c @@ -44,7 +44,7 @@ check_file_count_and_size (SeafRepo *repo, SeafDirent *dent, gint64 total_files, int post_files_and_gen_commit (GList *filenames, - SeafRepo *repo, + const char *repo_id, const char *user, char **ret_json, int replace_existed, @@ -455,6 +455,7 @@ gen_new_commit (const char *repo_id, const char *user, const char *desc, char *new_commit_id, + gboolean retry_on_conflict, GError **error) { #define MAX_RETRY_COUNT 3 @@ -568,6 +569,11 @@ retry: repo->head, current_head->commit_id) < 0) { + if (!retry_on_conflict) { + g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_CONCURRENT_UPLOAD, "Concurrent upload"); + ret = -1; + goto out; + } seaf_repo_unref (repo); repo = NULL; seaf_commit_unref (current_head); @@ -632,6 +638,7 @@ seaf_repo_manager_post_file (SeafRepoManager *mgr, SeafDirent *new_dent = NULL; char hex[41]; int ret = 0; + int retry_cnt = 0; if (g_access (temp_file_path, R_OK) != 0) { seaf_warning ("[post file] File %s doesn't exist or not readable.\n", @@ -695,6 +702,7 @@ seaf_repo_manager_post_file (SeafRepoManager *mgr, hex, STD_FILE_MODE, file_name, (gint64)time(NULL), user, size); +retry: root_id = do_post_file (repo, head_commit->root_id, canon_path, new_dent); if (!root_id) { @@ -708,9 +716,24 @@ seaf_repo_manager_post_file (SeafRepoManager *mgr, snprintf(buf, SEAF_PATH_MAX, "Added \"%s\"", file_name); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, error) < 0) { - ret = -1; - goto out; + user, buf, NULL, FALSE, error) < 0) { + if (*error == NULL || (*error)->code != SEAF_ERR_CONCURRENT_UPLOAD) { + ret = -1; + goto out; + } + + retry_cnt++; + seaf_debug ("[post file] Concurrency upload retry :%d\n", retry_cnt); + /* Sleep random time between 0 and 3 seconds. */ + usleep (g_random_int_range(0, 3) * 1000 * 1000); + + seaf_repo_unref (repo); + seaf_commit_unref(head_commit); + + GET_REPO_OR_FAIL(repo, repo_id); + GET_COMMIT_OR_FAIL(head_commit, repo->id, repo->version, repo->head->commit_id); + + goto retry; } seaf_repo_manager_merge_virtual_repo (mgr, repo_id, NULL); @@ -1086,7 +1109,7 @@ seaf_repo_manager_post_multi_files (SeafRepoManager *mgr, size_list = g_list_reverse (size_list); ret = post_files_and_gen_commit (filenames, - repo, + repo->id, user, ret_json, replace_existed, @@ -1124,7 +1147,7 @@ out: int post_files_and_gen_commit (GList *filenames, - SeafRepo *repo, + const char *repo_id, const char *user, char **ret_json, int replace_existed, @@ -1133,14 +1156,18 @@ post_files_and_gen_commit (GList *filenames, GList *size_list, GError **error) { + SeafRepo *repo = NULL; GList *name_list = NULL; GString *buf = g_string_new (NULL); SeafCommit *head_commit = NULL; char *root_id = NULL; int ret = 0; + int retry_cnt = 0; + GET_REPO_OR_FAIL(repo, repo_id); GET_COMMIT_OR_FAIL(head_commit, repo->id, repo->version, repo->head->commit_id); +retry: /* Add the files to parent dir and commit. */ root_id = do_post_multi_files (repo, head_commit->root_id, canon_path, filenames, id_list, size_list, user, @@ -1161,9 +1188,24 @@ post_files_and_gen_commit (GList *filenames, g_string_printf (buf, "Added \"%s\".", (char *)(filenames->data)); if (gen_new_commit (repo->id, head_commit, root_id, - user, buf->str, NULL, error) < 0) { - ret = -1; - goto out; + user, buf->str, NULL, FALSE, error) < 0) { + if (*error == NULL || (*error)->code != SEAF_ERR_CONCURRENT_UPLOAD) { + ret = -1; + goto out; + } + + retry_cnt++; + seaf_debug ("[post multi-file] Concurrency upload retry :%d\n", retry_cnt); + + /* Sleep random time between 0 and 3 seconds. */ + usleep (g_random_int_range(0, 3) * 1000 * 1000); + + seaf_repo_unref (repo); + seaf_commit_unref(head_commit); + + GET_REPO_OR_FAIL(repo, repo_id); + GET_COMMIT_OR_FAIL(head_commit, repo->id, repo->version, repo->head->commit_id); + goto retry; } seaf_repo_manager_merge_virtual_repo (seaf->repo_mgr, repo->id, NULL); @@ -1174,6 +1216,8 @@ post_files_and_gen_commit (GList *filenames, update_repo_size(repo->id); out: + if (repo) + seaf_repo_unref (repo); if (head_commit) seaf_commit_unref(head_commit); string_list_free (name_list); @@ -1472,7 +1516,7 @@ seaf_repo_manager_commit_file_blocks (SeafRepoManager *mgr, *new_id = g_strdup(hex); snprintf(buf, SEAF_PATH_MAX, "Added \"%s\"", file_name); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, error) < 0) + user, buf, NULL, TRUE, error) < 0) ret = -1; out: @@ -1696,7 +1740,7 @@ seaf_repo_manager_del_file (SeafRepoManager *mgr, } if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, error) < 0) { + user, buf, NULL, TRUE, error) < 0) { ret = -1; goto out; } @@ -1833,7 +1877,7 @@ put_dirent_and_commit (SeafRepo *repo, } if (gen_new_commit (repo->id, head_commit, root_id, - user, buf, NULL, error) < 0) + user, buf, NULL, TRUE, error) < 0) ret = -1; out: @@ -2676,7 +2720,7 @@ move_file_same_repo (const char *repo_id, } if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, error) < 0) + user, buf, NULL, TRUE, error) < 0) ret = -1; out: @@ -3416,7 +3460,7 @@ seaf_repo_manager_mkdir_with_parents (SeafRepoManager *mgr, /* Commit. */ snprintf(buf, SEAF_PATH_MAX, "Added directory \"%s\"", relative_dir_can); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, error) < 0) { + user, buf, NULL, TRUE, error) < 0) { ret = -1; g_free (root_id); goto out; @@ -3497,7 +3541,7 @@ seaf_repo_manager_post_dir (SeafRepoManager *mgr, /* Commit. */ snprintf(buf, SEAF_PATH_MAX, "Added directory \"%s\"", new_dir_name); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, error) < 0) { + user, buf, NULL, TRUE, error) < 0) { ret = -1; goto out; } @@ -3569,7 +3613,7 @@ seaf_repo_manager_post_empty_file (SeafRepoManager *mgr, /* Commit. */ snprintf(buf, SEAF_PATH_MAX, "Added \"%s\"", new_file_name); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, error) < 0) { + user, buf, NULL, TRUE, error) < 0) { ret = -1; goto out; } @@ -3762,7 +3806,7 @@ seaf_repo_manager_rename_file (SeafRepoManager *mgr, } if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, error) < 0) { + user, buf, NULL, TRUE, error) < 0) { ret = -1; goto out; } @@ -3998,7 +4042,7 @@ seaf_repo_manager_put_file (SeafRepoManager *mgr, /* Commit. */ snprintf(buf, SEAF_PATH_MAX, "Modified \"%s\"", file_name); - if (gen_new_commit (repo_id, head_commit, root_id, user, buf, NULL, error) < 0) { + if (gen_new_commit (repo_id, head_commit, root_id, user, buf, NULL, TRUE, error) < 0) { ret = -1; goto out; } @@ -4080,7 +4124,7 @@ seaf_repo_manager_update_dir (SeafRepoManager *mgr, commit_desc = g_strdup("Auto merge by system"); if (gen_new_commit (repo_id, head_commit, new_dir_id, - user, commit_desc, new_commit_id, error) < 0) + user, commit_desc, new_commit_id, TRUE, error) < 0) ret = -1; g_free (commit_desc); goto out; @@ -4113,7 +4157,7 @@ seaf_repo_manager_update_dir (SeafRepoManager *mgr, commit_desc = g_strdup("Auto merge by system"); if (gen_new_commit (repo_id, head_commit, root_id, - user, commit_desc, new_commit_id, error) < 0) { + user, commit_desc, new_commit_id, TRUE, error) < 0) { ret = -1; g_free (commit_desc); goto out; @@ -4602,7 +4646,7 @@ seaf_repo_manager_revert_file (SeafRepoManager *mgr, #endif snprintf(buf, SEAF_PATH_MAX, "Reverted file \"%s\" to status at %s", filename, time_str); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, error) < 0) { + user, buf, NULL, TRUE, error) < 0) { ret = -1; goto out; } @@ -4811,7 +4855,7 @@ seaf_repo_manager_revert_dir (SeafRepoManager *mgr, /* Commit. */ snprintf(buf, SEAF_PATH_MAX, "Recovered deleted directory \"%s\"", dirname); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, error) < 0) { + user, buf, NULL, TRUE, error) < 0) { ret = -1; goto out; } diff --git a/tests/test_upload/account.conf b/tests/test_upload/account.conf new file mode 100644 index 0000000..328b070 --- /dev/null +++ b/tests/test_upload/account.conf @@ -0,0 +1,6 @@ +[account] +server = http://192.168.60.132 +username = 123456@qq.com +password = 123456 +repoid = e63f9fc8-880a-427f-b2c4-42c00538cb94 +thread_num = 1000 diff --git a/tests/test_upload/go.mod b/tests/test_upload/go.mod new file mode 100644 index 0000000..57888c6 --- /dev/null +++ b/tests/test_upload/go.mod @@ -0,0 +1,8 @@ +module test_upload + +go 1.18 + +require ( + github.com/haiwen/seafile-server/fileserver v0.0.0-20220621072834-faf434def97d // indirect + gopkg.in/ini.v1 v1.66.6 // indirect +) diff --git a/tests/test_upload/go.sum b/tests/test_upload/go.sum new file mode 100644 index 0000000..172a719 --- /dev/null +++ b/tests/test_upload/go.sum @@ -0,0 +1,29 @@ +github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc= +github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/haiwen/seafile-server/fileserver v0.0.0-20220621072834-faf434def97d h1:7W5BeFzUFCx+xz5pINiuRJesr82pA2Gq0LZeHXBI0jE= +github.com/haiwen/seafile-server/fileserver v0.0.0-20220621072834-faf434def97d/go.mod h1:3r5rRrKrYibzy1quQOR0/yvT+7L+iuAFAwTcggCp6wg= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +gopkg.in/ini.v1 v1.55.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI= +gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/tests/test_upload/readme.md b/tests/test_upload/readme.md new file mode 100644 index 0000000..0747393 --- /dev/null +++ b/tests/test_upload/readme.md @@ -0,0 +1 @@ +go run test_upload.go -c accont.conf -p runtime diff --git a/tests/test_upload/test_upload.go b/tests/test_upload/test_upload.go new file mode 100644 index 0000000..e6029b6 --- /dev/null +++ b/tests/test_upload/test_upload.go @@ -0,0 +1,173 @@ +package main + +import "fmt" +import "io" +import "sync" +import "flag" +import "log" +import "encoding/json" +import "bytes" +import "net/http" +import "mime/multipart" +import "path/filepath" + +import "gopkg.in/ini.v1" +import "github.com/haiwen/seafile-server/fileserver/searpc" + +type Options struct { + server string + username string + password string + repoID string + threadNum int +} + +var confPath string +var rpcPipePath string +var options Options +var rpcclient *searpc.Client + +func init() { + flag.StringVar(&confPath, "c", "", "config file path") + flag.StringVar(&rpcPipePath, "p", "", "rpc pipe path") +} + +func main() { + flag.Parse() + + pipePath := filepath.Join(rpcPipePath, "seafile.sock") + rpcclient = searpc.Init(pipePath, "seafserv-threaded-rpcserver") + + config, err := ini.Load(confPath) + if err != nil { + log.Fatalf("Failed to load config file: %v", err) + } + section, err := config.GetSection("account") + if err != nil { + log.Fatal("No account section in config file.") + } + + key, err := section.GetKey("server") + if err == nil { + options.server = key.String() + } + + key, err = section.GetKey("username") + if err == nil { + options.username = key.String() + } + + key, err = section.GetKey("password") + if err == nil { + options.password = key.String() + } + key, err = section.GetKey("repoid") + if err == nil { + options.repoID = key.String() + } + key, err = section.GetKey("thread_num") + if err == nil { + options.threadNum, _ = key.Int() + } + + objID := "{\"parent_dir\":\"/\"}" + token, err := rpcclient.Call("seafile_web_get_access_token", options.repoID, objID, "upload", options.username, false) + if err != nil { + log.Fatal("Failed to get web access token\n") + } + accessToken, _ := token.(string) + + url := fmt.Sprintf("%s:8082/upload-api/%s", options.server, accessToken) + content := []byte("123456") + + var group sync.WaitGroup + for i := 0; i < options.threadNum; i++ { + group.Add(1) + go func(i int) { + values := make(map[string]io.Reader) + values["file"] = bytes.NewReader(content) + values["parent_dir"] = bytes.NewBuffer([]byte("/")) + // values["relative_path"] = bytes.NewBuffer([]byte(relativePath)) + values["replace"] = bytes.NewBuffer([]byte("0")) + form, contentType, err := createForm(values, "111.md") + if err != nil { + log.Fatal("Failed to create multipart form: %v", err) + } + headers := make(map[string][]string) + headers["Content-Type"] = []string{contentType} + // headers["Authorization"] = []string{"Token " + accessToken.(string)} + status, body, err := HttpCommon("POST", url, headers, form) + + log.Printf("[%d]upload status: %d return body: %s error: %v\n", i, status, string(body), err) + group.Done() + }(i) + } + group.Wait() +} + +func createForm(values map[string]io.Reader, name string) (io.Reader, string, error) { + buf := new(bytes.Buffer) + w := multipart.NewWriter(buf) + defer w.Close() + + for k, v := range values { + var fw io.Writer + var err error + if k == "file" { + if fw, err = w.CreateFormFile(k, name); err != nil { + return nil, "", err + } + } else { + if fw, err = w.CreateFormField(k); err != nil { + return nil, "", err + } + } + if _, err = io.Copy(fw, v); err != nil { + return nil, "", err + } + } + + return buf, w.FormDataContentType(), nil +} + +func HttpCommon(method, url string, header map[string][]string, reader io.Reader) (int, []byte, error) { + req, err := http.NewRequest(method, url, reader) + if err != nil { + return -1, nil, err + } + req.Header = header + + rsp, err := http.DefaultClient.Do(req) + if err != nil { + return -1, nil, err + } + defer rsp.Body.Close() + + if rsp.StatusCode == http.StatusNotFound { + return rsp.StatusCode, nil, fmt.Errorf("url %s not found", url) + } + body, err := io.ReadAll(rsp.Body) + if err != nil { + return rsp.StatusCode, nil, err + } + + return rsp.StatusCode, body, nil +} + +func getToken() string { + url := fmt.Sprintf("%s:8000/api2/auth-token/", options.server) + header := make(map[string][]string) + header["Content-Type"] = []string{"application/x-www-form-urlencoded"} + data := []byte(fmt.Sprintf("username=%s&password=%s", options.username, options.password)) + _, body, err := HttpCommon("POST", url, header, bytes.NewReader(data)) + if err != nil { + return "" + } + tokenMap := make(map[string]interface{}) + err = json.Unmarshal(body, &tokenMap) + if err != nil { + return "" + } + token, _ := tokenMap["token"].(string) + return token +}