From caee7ef963662a58d3bf3855c50427531c9ac22b Mon Sep 17 00:00:00 2001 From: cuihaikuo Date: Tue, 6 Feb 2018 14:37:27 +0800 Subject: [PATCH] Support background task for indexing blocks. --- common/cdc/cdc.c | 15 +- common/cdc/cdc.h | 6 +- common/fs-mgr.c | 20 ++- common/fs-mgr.h | 3 +- common/rpc-service.c | 1 + server/Makefile.am | 2 + server/http-server.c | 17 ++ server/http-server.h | 1 + server/index-blocks-mgr.c | 341 ++++++++++++++++++++++++++++++++++++++ server/index-blocks-mgr.h | 41 +++++ server/repo-mgr.h | 12 ++ server/repo-op.c | 134 ++++++++++----- server/seafile-session.c | 4 + server/seafile-session.h | 2 + server/upload-file.c | 67 ++++++-- 15 files changed, 601 insertions(+), 65 deletions(-) create mode 100644 server/index-blocks-mgr.c create mode 100644 server/index-blocks-mgr.h diff --git a/common/cdc/cdc.c b/common/cdc/cdc.c index b2222f7..d676890 100644 --- a/common/cdc/cdc.c +++ b/common/cdc/cdc.c @@ -112,7 +112,8 @@ do { \ int file_chunk_cdc(int fd_src, CDCFileDescriptor *file_descr, SeafileCrypt *crypt, - gboolean write_data) + gboolean write_data, + gint64 *indexed) { char *buf; uint32_t buf_sz; @@ -180,7 +181,10 @@ int file_chunk_cdc(int fd_src, free (buf); return -1; } + gint64 idx_size = tail; WRITE_CDC_BLOCK (tail, write_data); + if (indexed) + *indexed += idx_size; } break; } @@ -206,8 +210,10 @@ int file_chunk_cdc(int fd_src, free (buf); return -1; } - + gint64 idx_size = cur + 1; WRITE_CDC_BLOCK (cur + 1, write_data); + if (indexed) + *indexed += idx_size; break; } else { cur ++; @@ -225,7 +231,8 @@ int file_chunk_cdc(int fd_src, int filename_chunk_cdc(const char *filename, CDCFileDescriptor *file_descr, SeafileCrypt *crypt, - gboolean write_data) + gboolean write_data, + gint64 *indexed) { int fd_src = seaf_util_open (filename, O_RDONLY | O_BINARY); if (fd_src < 0) { @@ -233,7 +240,7 @@ int filename_chunk_cdc(const char *filename, return -1; } - int ret = file_chunk_cdc (fd_src, file_descr, crypt, write_data); + int ret = file_chunk_cdc (fd_src, file_descr, crypt, write_data, indexed); close (fd_src); return ret; } diff --git a/common/cdc/cdc.h b/common/cdc/cdc.h index 75fb900..9d93630 100644 --- a/common/cdc/cdc.h +++ b/common/cdc/cdc.h @@ -60,12 +60,14 @@ typedef struct _CDCDescriptor { int file_chunk_cdc(int fd_src, CDCFileDescriptor *file_descr, struct SeafileCrypt *crypt, - gboolean write_data); + gboolean write_data, + gint64 *indexed); int filename_chunk_cdc(const char *filename, CDCFileDescriptor *file_descr, struct SeafileCrypt *crypt, - gboolean write_data); + gboolean write_data, + gint64 *indexed); void cdc_init (); diff --git a/common/fs-mgr.c b/common/fs-mgr.c index 68d3e40..86a8d20 100644 --- a/common/fs-mgr.c +++ b/common/fs-mgr.c @@ -678,7 +678,8 @@ split_file_to_block (const char *repo_id, gint64 file_size, SeafileCrypt *crypt, CDCFileDescriptor *cdc, - gboolean write_data) + gboolean write_data, + gint64 *indexed) { int n_blocks; uint8_t *block_sha1s = NULL; @@ -739,9 +740,14 @@ split_file_to_block (const char *repo_id, ret = -1; goto out; } + if (indexed) + *indexed += seaf->http_server->fixed_block_size; - if ((--n_pending) <= 0) + if ((--n_pending) <= 0) { + if (indexed) + *indexed = (guint64)file_size; break; + } } cdc->block_nr = n_blocks; @@ -774,7 +780,8 @@ seaf_fs_manager_index_blocks (SeafFSManager *mgr, gint64 *size, SeafileCrypt *crypt, gboolean write_data, - gboolean use_cdc) + gboolean use_cdc, + gint64 *indexed) { SeafStat sb; CDCFileDescriptor cdc; @@ -792,7 +799,6 @@ seaf_fs_manager_index_blocks (SeafFSManager *mgr, create_cdc_for_empty_file (&cdc); } else { memset (&cdc, 0, sizeof(cdc)); - #if defined SEAFILE_SERVER && defined FULL_FEATURE if (use_cdc || version == 0) { cdc.block_sz = CDC_AVERAGE_BLOCK_SIZE; @@ -801,7 +807,7 @@ seaf_fs_manager_index_blocks (SeafFSManager *mgr, cdc.write_block = seafile_write_chunk; memcpy (cdc.repo_id, repo_id, 36); cdc.version = version; - if (filename_chunk_cdc (file_path, &cdc, crypt, write_data) < 0) { + if (filename_chunk_cdc (file_path, &cdc, crypt, write_data, indexed) < 0) { seaf_warning ("Failed to chunk file with CDC.\n"); return -1; } @@ -810,7 +816,7 @@ seaf_fs_manager_index_blocks (SeafFSManager *mgr, cdc.version = version; cdc.file_size = sb.st_size; if (split_file_to_block (repo_id, version, file_path, sb.st_size, - crypt, &cdc, write_data) < 0) { + crypt, &cdc, write_data, indexed) < 0) { return -1; } } @@ -821,7 +827,7 @@ seaf_fs_manager_index_blocks (SeafFSManager *mgr, cdc.write_block = seafile_write_chunk; memcpy (cdc.repo_id, repo_id, 36); cdc.version = version; - if (filename_chunk_cdc (file_path, &cdc, crypt, write_data) < 0) { + if (filename_chunk_cdc (file_path, &cdc, crypt, write_data, indexed) < 0) { seaf_warning ("Failed to chunk file with CDC.\n"); return -1; } diff --git a/common/fs-mgr.h b/common/fs-mgr.h index 4fe8870..7151caf 100644 --- a/common/fs-mgr.h +++ b/common/fs-mgr.h @@ -220,7 +220,8 @@ seaf_fs_manager_index_blocks (SeafFSManager *mgr, gint64 *size, SeafileCrypt *crypt, gboolean write_data, - gboolean use_cdc); + gboolean use_cdc, + gint64 *indexed); Seafile * seaf_fs_manager_get_seafile (SeafFSManager *mgr, diff --git a/common/rpc-service.c b/common/rpc-service.c index 219e1ef..ddf7fbe 100644 --- a/common/rpc-service.c +++ b/common/rpc-service.c @@ -3361,6 +3361,7 @@ seafile_post_multi_files (const char *repo_id, user, replace_existed, &ret_json, + NULL, error); out: diff --git a/server/Makefile.am b/server/Makefile.am index 2491f5e..7af1200 100644 --- a/server/Makefile.am +++ b/server/Makefile.am @@ -54,12 +54,14 @@ noinst_HEADERS = web-accesstoken-mgr.h chunkserv-mgr.h seafile-session.h \ fileserver-config.h \ http-status-codes.h \ zip-download-mgr.h \ + index-blocks-mgr.h \ $(proc_headers) seaf_server_SOURCES = \ seaf-server.c \ web-accesstoken-mgr.c chunkserv-mgr.c seafile-session.c \ zip-download-mgr.c \ + index-blocks-mgr.c \ share-mgr.c \ token-mgr.c \ passwd-mgr.c \ diff --git a/server/http-server.c b/server/http-server.c index 0d8f744..3ee3f8f 100644 --- a/server/http-server.c +++ b/server/http-server.c @@ -33,6 +33,7 @@ #define DEFAULT_WORKER_THREADS 10 #define DEFAULT_MAX_DOWNLOAD_DIR_SIZE 100 * ((gint64)1 << 20) /* 100MB */ #define DEFAULT_MAX_INDEXING_THREADS 1 +#define DEFAULT_MAX_INDEX_PROCESSING_THREADS 3 #define DEFAULT_FIXED_BLOCK_SIZE ((gint64)1 << 23) /* 8MB */ #define HOST "host" @@ -118,6 +119,7 @@ load_http_config (HttpServerStruct *htp_server, SeafileSession *session) int fixed_block_size_mb; char *encoding; int max_indexing_threads; + int max_index_processing_threads; host = fileserver_config_get_string (session->config, HOST, &error); if (!error) { @@ -205,6 +207,21 @@ load_http_config (HttpServerStruct *htp_server, SeafileSession *session) seaf_message ("fileserver: max_indexing_threads = %d\n", htp_server->max_indexing_threads); + max_index_processing_threads = fileserver_config_get_integer (session->config, + "max_index_processing_threads", + &error); + if (error) { + htp_server->max_index_processing_threads = DEFAULT_MAX_INDEX_PROCESSING_THREADS; + g_clear_error (&error); + } else { + if (max_index_processing_threads <= 0) + htp_server->max_index_processing_threads = DEFAULT_MAX_INDEX_PROCESSING_THREADS; + else + htp_server->max_index_processing_threads = max_index_processing_threads; + } + seaf_message ("fileserver: max_index_processing_threads= %d\n", + htp_server->max_index_processing_threads); + encoding = g_key_file_get_string (session->config, "zip", "windows_encoding", &error); diff --git a/server/http-server.h b/server/http-server.h index 7ff4c33..eb04aec 100644 --- a/server/http-server.h +++ b/server/http-server.h @@ -20,6 +20,7 @@ struct _HttpServerStruct { int web_token_expire_time; int max_indexing_threads; int worker_threads; + int max_index_processing_threads; }; typedef struct _HttpServerStruct HttpServerStruct; diff --git a/server/index-blocks-mgr.c b/server/index-blocks-mgr.c new file mode 100644 index 0000000..67e675e --- /dev/null +++ b/server/index-blocks-mgr.c @@ -0,0 +1,341 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include "common.h" + +#include + +#include +#include + +#include +#include +#include "utils.h" +#include "log.h" + +#include "seafile-session.h" +#include "repo-mgr.h" +#include "fs-mgr.h" +#include "seafile-error.h" +#include "seafile-crypt.h" +#include "index-blocks-mgr.h" + +#define TOKEN_LEN 36 +#define PROGRESS_TTL 5 * 3600 // 5 hours +#define SCAN_PROGRESS_INTERVAL 24 * 3600 // 1 day + +static void +start_index_task (gpointer data, gpointer user_data); + +static char * +gen_new_token (GHashTable *token_hash); + +static int +scan_progress (void *data); + +struct SeafileCrypt; + +typedef struct IndexBlksMgrPriv { + pthread_mutex_t progress_lock; + GHashTable *progress_store; + GThreadPool *idx_tpool; + // This timer is used to scan progress and remove invalid progress. + CcnetTimer *scan_progress_timer; +} IndexBlksMgrPriv; + +typedef struct IndexPara { + GList *filenames; + GList *paths; + SeafRepo *repo; + char *user; + char *canon_path; + int replace_existed; + SeafileCrypt *crypt; + gboolean ret_json; + IdxProgress *progress; +} IndexPara; + +static void +free_progress (IdxProgress *progress) +{ + if (!progress) + return; + + g_free (progress->ret_json); + g_free (progress); +} + + +IndexBlksMgr * +index_blocks_mgr_new (SeafileSession *session) +{ + GError *error = NULL; + IndexBlksMgr *mgr = g_new0 (IndexBlksMgr, 1); + IndexBlksMgrPriv *priv = g_new0 (IndexBlksMgrPriv, 1); + + priv->idx_tpool = g_thread_pool_new (start_index_task, + priv, + session->http_server->max_index_processing_threads, + FALSE, &error); + if (!priv->idx_tpool) { + if (error) { + seaf_warning ("Failed to create index task thread pool: %s.\n", error->message); + g_clear_error (&error); + } else { + seaf_warning ("Failed to create index task thread pool.\n"); + } + g_free (priv); + g_free (mgr); + return NULL; + } + + pthread_mutex_init (&priv->progress_lock, NULL); + priv->progress_store = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, + (GDestroyNotify)free_progress); + priv->scan_progress_timer = ccnet_timer_new (scan_progress, priv, + SCAN_PROGRESS_INTERVAL * 1000); + mgr->priv = priv; + + return mgr; +} + +static int +scan_progress (void *data) +{ + time_t now = time(NULL); + IndexBlksMgrPriv *priv = data; + GHashTableIter iter; + gpointer key, value; + IdxProgress *progress; + + pthread_mutex_lock (&priv->progress_lock); + + g_hash_table_iter_init (&iter, priv->progress_store); + while (g_hash_table_iter_next (&iter, &key, &value)) { + progress = value; + if (now >= progress->expire_ts && progress->status != 1) { + g_hash_table_iter_remove (&iter); + } + } + + pthread_mutex_unlock (&priv->progress_lock); + + return TRUE; +} + +static void +free_index_para (IndexPara *idx_para) +{ + if (!idx_para) + return; + + string_list_free (idx_para->filenames); + string_list_free (idx_para->paths); + seaf_repo_unref (idx_para->repo); + g_free (idx_para->user); + g_free (idx_para->canon_path); + g_free (idx_para->crypt); + g_free (idx_para); +} + +static void +start_index_task (gpointer data, gpointer user_data) +{ + IndexPara *idx_para = data; + SeafRepo *repo = idx_para->repo; + GList *ptr = NULL, *id_list = NULL, *size_list = NULL; + char *path; + char *ret_json; + char hex[41]; + unsigned char sha1[20]; + int ret = 0; + IdxProgress *progress = idx_para->progress; + SeafileCrypt *crypt = idx_para->crypt; + + gint64 *size; + for (ptr = idx_para->paths; ptr; ptr = ptr->next) { + path = ptr->data; + + size = g_new (gint64, 1); + if (seaf_fs_manager_index_blocks (seaf->fs_mgr, + repo->store_id, repo->version, + path, sha1, size, crypt, TRUE, FALSE, &(progress->indexed)) < 0) { + seaf_warning ("failed to index blocks"); + progress->status = -1; + goto out; + } + + rawdata_to_hex(sha1, hex, 20); + id_list = g_list_prepend (id_list, g_strdup(hex)); + size_list = g_list_prepend (size_list, size); + } + id_list = g_list_reverse (id_list); + size_list = g_list_reverse (size_list); + ret = post_files_and_gen_commit (idx_para->filenames, + idx_para->repo, + idx_para->user, + idx_para->ret_json ? &ret_json : NULL, + idx_para->replace_existed, + idx_para->canon_path, + id_list, + size_list, + NULL); + progress->status = ret; + if (idx_para->ret_json) { + progress->ret_json = g_strdup(ret_json); + g_free (ret_json); + } + +out: + /* remove temp files */ + for (ptr = idx_para->paths; ptr; ptr = ptr->next) + g_unlink (ptr->data); + + g_list_free_full (id_list, g_free); + g_list_free_full (size_list, g_free); + free_index_para (idx_para); + return; +} + +char * +index_blocks_mgr_query_progress (IndexBlksMgr *mgr, + const char *token, + GError **error) +{ + char *ret_info; + json_t *obj; + IdxProgress *progress; + IndexBlksMgrPriv *priv = mgr->priv; + + pthread_mutex_lock (&priv->progress_lock); + progress = g_hash_table_lookup (priv->progress_store, token); + pthread_mutex_unlock (&priv->progress_lock); + + if (!progress) { + seaf_warning ("Index progress not found for token %s\n", token); + g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL, + "Index progress not found"); + return NULL; + } + + obj = json_object (); + json_object_set_int_member (obj, "indexed", progress->indexed); + json_object_set_int_member (obj, "total", progress->total); + json_object_set_int_member (obj, "status", progress->status); + json_object_set_string_member (obj, "ret_json", progress->ret_json); + ret_info = json_dumps (obj, JSON_COMPACT); + json_decref (obj); + + /* index finished */ + if (progress->status != 1) { + pthread_mutex_lock (&priv->progress_lock); + g_hash_table_remove (priv->progress_store, token); + pthread_mutex_unlock (&priv->progress_lock); + } + + return ret_info; +} + +int +index_blocks_mgr_start_index (IndexBlksMgr *mgr, + GList *filenames, + GList *paths, + const char *repo_id, + const char *user, + int replace_existed, + gboolean ret_json, + const char *canon_path, + SeafileCrypt *crypt, + char **task_id) +{ + GList *ptr = NULL; + char *path = NULL, *token = NULL; + SeafileCrypt *_crypt = NULL; + + SeafRepo *repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id); + if (!repo) { + seaf_warning ("Failed to get repo %.8s.\n", repo_id); + return -1; + } + IndexBlksMgrPriv *priv = mgr->priv; + + token = gen_new_token(priv->progress_store); + if (!token) { + seaf_warning ("Failed to genarate index token for repo %.8s.\n", repo_id); + seaf_repo_unref (repo); + return -1; + } + if (crypt) { + _crypt = g_new0(SeafileCrypt, 1); + memcpy (_crypt, crypt, sizeof (SeafileCrypt)); + } + + *task_id = g_strdup (token); + IdxProgress *progress = g_new0(IdxProgress, 1); + progress->status = 1; + + IndexPara *idx_para = g_new0 (IndexPara, 1); + idx_para->filenames = g_list_copy_deep (filenames, (GCopyFunc)g_strdup, NULL); + idx_para->paths = g_list_copy_deep (paths, (GCopyFunc)g_strdup, NULL); + idx_para->repo = repo; + idx_para->user = g_strdup (user); + idx_para->canon_path = g_strdup(canon_path); + idx_para->replace_existed = replace_existed; + idx_para->ret_json = ret_json; + idx_para->crypt = _crypt; + idx_para->progress = progress; + + progress->status = 1; + progress->expire_ts = time(NULL) + PROGRESS_TTL; + + /* Get total size of all files for progress. */ + for (ptr = paths; ptr; ptr = ptr->next) { + SeafStat sb; + path = ptr->data; + if (seaf_stat (path, &sb) < 0) { + seaf_warning ("Bad file %s: %s.\n", path, strerror(errno)); + goto error; + } + + if (!S_ISREG(sb.st_mode)) + goto error; + + progress->total += (gint64)sb.st_size; + } + + pthread_mutex_lock (&priv->progress_lock); + g_hash_table_replace (priv->progress_store, g_strdup (token), progress); + pthread_mutex_unlock (&priv->progress_lock); + + g_thread_pool_push (priv->idx_tpool, idx_para, NULL); + + g_free (token); + return 0; + +error: + g_free (token); + /* remove temp files */ + for (ptr = idx_para->paths; ptr; ptr = ptr->next) + g_unlink (ptr->data); + + free_index_para (idx_para); + + return -1; +} + +static char * +gen_new_token (GHashTable *token_hash) +{ + char uuid[37]; + char *token; + + while (1) { + gen_uuid_inplace (uuid); + token = g_strndup(uuid, TOKEN_LEN); + + /* Make sure the new token doesn't conflict with an existing one. */ + if (g_hash_table_lookup (token_hash, token) != NULL) + g_free (token); + else + return token; + } +} diff --git a/server/index-blocks-mgr.h b/server/index-blocks-mgr.h new file mode 100644 index 0000000..830e052 --- /dev/null +++ b/server/index-blocks-mgr.h @@ -0,0 +1,41 @@ +#ifndef INDEX_BLOCKS_MGR_H +#define INDEX_BLOCKS_MGR_H + +#include "seafile-object.h" + +struct IndexBlksMgrPriv; +struct _SeafileSession; + +typedef struct IndexBlksMgr { + struct IndexBlksMgrPriv *priv; +} IndexBlksMgr; + +typedef struct IdxProgress { + gint64 indexed; + gint64 total; + int status; /* 0: finished, -1: error, 1: indexing */ + char *ret_json; + gint64 expire_ts; +} IdxProgress; + +IndexBlksMgr * +index_blocks_mgr_new (struct _SeafileSession *session); + +char * +index_blocks_mgr_query_progress (IndexBlksMgr *mgr, + const char *token, + GError **error); + +int +index_blocks_mgr_start_index (IndexBlksMgr *mgr, + GList *filenames, + GList *paths, + const char *repo_id, + const char *user, + int replace_existed, + gboolean ret_json, + const char *canon_path, + SeafileCrypt *crypt, + char **task_id); + +#endif diff --git a/server/repo-mgr.h b/server/repo-mgr.h index 67171a1..bdf74c6 100644 --- a/server/repo-mgr.h +++ b/server/repo-mgr.h @@ -316,6 +316,7 @@ seaf_repo_manager_post_multi_files (SeafRepoManager *mgr, const char *user, int replace_existed, char **new_ids, + char **task_id, GError **error); /* int */ @@ -849,4 +850,15 @@ seaf_repo_manager_set_subdir_group_perm_by_path (SeafRepoManager *mgr, int group_id, const char *permission, const char *path); + +int +post_files_and_gen_commit (GList *filenames, + SeafRepo *repo, + const char *user, + char **ret_json, + int replace_existed, + const char *canon_path, + GList *id_list, + GList *size_list, + GError **error); #endif diff --git a/server/repo-op.c b/server/repo-op.c index dd70f99..4b7409e 100644 --- a/server/repo-op.c +++ b/server/repo-op.c @@ -38,6 +38,17 @@ should_ignore_file(const char *filename, void *data); static gboolean is_virtual_repo_and_origin (SeafRepo *repo1, SeafRepo *repo2); +int +post_files_and_gen_commit (GList *filenames, + SeafRepo *repo, + const char *user, + char **ret_json, + int replace_existed, + const char *canon_path, + GList *id_list, + GList *size_list, + GError **error); + /* * Repo operations. */ @@ -668,7 +679,7 @@ seaf_repo_manager_post_file (SeafRepoManager *mgr, if (seaf_fs_manager_index_blocks (seaf->fs_mgr, repo->store_id, repo->version, temp_file_path, - sha1, &size, crypt, TRUE, FALSE) < 0) { + sha1, &size, crypt, TRUE, FALSE, NULL) < 0) { seaf_warning ("failed to index blocks"); g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL, "Failed to index blocks"); @@ -986,23 +997,19 @@ seaf_repo_manager_post_multi_files (SeafRepoManager *mgr, const char *user, int replace_existed, char **ret_json, + char **task_id, GError **error) { SeafRepo *repo = NULL; - SeafCommit *head_commit = NULL; char *canon_path = NULL; - GList *filenames = NULL, *paths = NULL, *id_list = NULL, *name_list = NULL, - *size_list = NULL, *ptr; + GList *filenames = NULL, *paths = NULL, *id_list = NULL, *size_list = NULL, *ptr; char *filename, *path; unsigned char sha1[20]; - GString *buf = g_string_new (NULL); - char *root_id = NULL; SeafileCrypt *crypt = NULL; char hex[41]; int ret = 0; GET_REPO_OR_FAIL(repo, repo_id); - GET_COMMIT_OR_FAIL(head_commit, repo->id, repo->version, repo->head->commit_id); canon_path = get_canonical_path (parent_dir); @@ -1051,27 +1058,84 @@ seaf_repo_manager_post_multi_files (SeafRepoManager *mgr, crypt = seafile_crypt_new (repo->enc_version, key, iv); } - gint64 *size; - for (ptr = paths; ptr; ptr = ptr->next) { - path = ptr->data; + if (!task_id) { + gint64 *size; + for (ptr = paths; ptr; ptr = ptr->next) { + path = ptr->data; - size = g_new (gint64, 1); - if (seaf_fs_manager_index_blocks (seaf->fs_mgr, - repo->store_id, repo->version, - path, sha1, size, crypt, TRUE, FALSE) < 0) { - seaf_warning ("failed to index blocks"); - g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL, - "Failed to index blocks"); - ret = -1; - goto out; + size = g_new (gint64, 1); + if (seaf_fs_manager_index_blocks (seaf->fs_mgr, + repo->store_id, repo->version, + path, sha1, size, crypt, TRUE, FALSE, NULL) < 0) { + seaf_warning ("failed to index blocks"); + g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL, + "Failed to index blocks"); + ret = -1; + goto out; + } + + rawdata_to_hex(sha1, hex, 20); + id_list = g_list_prepend (id_list, g_strdup(hex)); + size_list = g_list_prepend (size_list, size); } + id_list = g_list_reverse (id_list); + size_list = g_list_reverse (size_list); - rawdata_to_hex(sha1, hex, 20); - id_list = g_list_prepend (id_list, g_strdup(hex)); - size_list = g_list_prepend (size_list, size); + ret = post_files_and_gen_commit (filenames, + repo, + user, + ret_json, + replace_existed, + canon_path, + id_list, + size_list, + error); + } else { + ret = index_blocks_mgr_start_index (seaf->index_blocks_mgr, + filenames, + paths, + repo_id, + user, + replace_existed, + ret_json == NULL ? FALSE : TRUE, + canon_path, + crypt, + task_id); } - id_list = g_list_reverse (id_list); - size_list = g_list_reverse (size_list); + +out: + if (repo) + seaf_repo_unref (repo); + string_list_free (filenames); + string_list_free (paths); + string_list_free (id_list); + for (ptr = size_list; ptr; ptr = ptr->next) + g_free (ptr->data); + g_list_free (size_list); + g_free (canon_path); + g_free (crypt); + + return ret; +} + +int +post_files_and_gen_commit (GList *filenames, + SeafRepo *repo, + const char *user, + char **ret_json, + int replace_existed, + const char *canon_path, + GList *id_list, + GList *size_list, + GError **error) +{ + GList *name_list = NULL; + GString *buf = g_string_new (NULL); + SeafCommit *head_commit = NULL; + char *root_id = NULL; + int ret = 0; + + GET_COMMIT_OR_FAIL(head_commit, repo->id, repo->version, repo->head->commit_id); /* Add the files to parent dir and commit. */ root_id = do_post_multi_files (repo, head_commit->root_id, canon_path, @@ -1085,7 +1149,6 @@ seaf_repo_manager_post_multi_files (SeafRepoManager *mgr, ret = -1; goto out; } - guint len = g_list_length (filenames); if (len > 1) g_string_printf (buf, "Added \"%s\" and %u more files.", @@ -1093,36 +1156,25 @@ seaf_repo_manager_post_multi_files (SeafRepoManager *mgr, else g_string_printf (buf, "Added \"%s\".", (char *)(filenames->data)); - if (gen_new_commit (repo_id, head_commit, root_id, + if (gen_new_commit (repo->id, head_commit, root_id, user, buf->str, NULL, error) < 0) { ret = -1; goto out; } - seaf_repo_manager_merge_virtual_repo (mgr, repo_id, NULL); + seaf_repo_manager_merge_virtual_repo (seaf->repo_mgr, repo->id, NULL); if (ret_json) *ret_json = format_json_ret (name_list, id_list, size_list); + update_repo_size(repo->id); + out: - if (repo) - seaf_repo_unref (repo); if (head_commit) seaf_commit_unref(head_commit); - string_list_free (filenames); - string_list_free (paths); - string_list_free (id_list); string_list_free (name_list); - for (ptr = size_list; ptr; ptr = ptr->next) - g_free (ptr->data); - g_list_free (size_list); g_string_free (buf, TRUE); g_free (root_id); - g_free (canon_path); - g_free (crypt); - - if (ret == 0) - update_repo_size(repo_id); return ret; } @@ -3846,7 +3898,7 @@ seaf_repo_manager_put_file (SeafRepoManager *mgr, if (seaf_fs_manager_index_blocks (seaf->fs_mgr, repo->store_id, repo->version, temp_file_path, - sha1, &size, crypt, TRUE, FALSE) < 0) { + sha1, &size, crypt, TRUE, FALSE, NULL) < 0) { seaf_warning ("failed to index blocks"); g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL, "Failed to index blocks"); diff --git a/server/seafile-session.c b/server/seafile-session.c index e90803a..88bb831 100644 --- a/server/seafile-session.c +++ b/server/seafile-session.c @@ -169,6 +169,10 @@ seafile_session_new(const char *central_config_dir, if (!session->zip_download_mgr) goto onerror; + session->index_blocks_mgr = index_blocks_mgr_new (session); + if (!session->index_blocks_mgr) + goto onerror; + return session; onerror: diff --git a/server/seafile-session.h b/server/seafile-session.h index 245e034..2652fa4 100644 --- a/server/seafile-session.h +++ b/server/seafile-session.h @@ -30,6 +30,7 @@ #include "http-server.h" #include "zip-download-mgr.h" +#include "index-blocks-mgr.h" #include @@ -89,6 +90,7 @@ struct _SeafileSession { HttpServerStruct *http_server; ZipDownloadMgr *zip_download_mgr; + IndexBlksMgr *index_blocks_mgr; }; extern SeafileSession *seaf; diff --git a/server/upload-file.c b/server/upload-file.c index 89865d7..2619628 100644 --- a/server/upload-file.c +++ b/server/upload-file.c @@ -73,6 +73,8 @@ typedef struct RecvFSM { /* For upload progress. */ char *progress_id; Progress *progress; + + gboolean need_idx_progress; } RecvFSM; #define MAX_CONTENT_LINE 10240 @@ -454,6 +456,7 @@ upload_cb(evhtp_request_t *req, void *arg) fsm->user, 0, NULL, + NULL, &error); g_free (filenames_json); g_free (tmp_files_json); @@ -572,6 +575,7 @@ upload_api_cb(evhtp_request_t *req, void *arg) tmp_files_json = file_list_to_json (fsm->files); char *ret_json = NULL; + char *task_id = NULL; int rc = seaf_repo_manager_post_multi_files (seaf->repo_mgr, fsm->repo_id, parent_dir, @@ -580,6 +584,7 @@ upload_api_cb(evhtp_request_t *req, void *arg) fsm->user, replace, &ret_json, + fsm->need_idx_progress ? &task_id : NULL, &error); g_free (filenames_json); g_free (tmp_files_json); @@ -593,14 +598,19 @@ upload_api_cb(evhtp_request_t *req, void *arg) goto error; } - const char *use_json = evhtp_kv_find (req->uri->query, "ret-json"); - if (use_json) { - evbuffer_add (req->buffer_out, ret_json, strlen(ret_json)); + if (task_id) { + evbuffer_add (req->buffer_out, task_id, strlen(task_id)); + g_free (task_id); } else { - char *new_ids = file_id_list_from_json (ret_json); - if (new_ids) - evbuffer_add (req->buffer_out, new_ids, strlen(new_ids)); - g_free (new_ids); + const char *use_json = evhtp_kv_find (req->uri->query, "ret-json"); + if (use_json) { + evbuffer_add (req->buffer_out, ret_json, strlen(ret_json)); + } else { + char *new_ids = file_id_list_from_json (ret_json); + if (new_ids) + evbuffer_add (req->buffer_out, new_ids, strlen(new_ids)); + g_free (new_ids); + } } g_free (ret_json); @@ -1036,6 +1046,7 @@ upload_ajax_cb(evhtp_request_t *req, void *arg) tmp_files_json = file_list_to_json (fsm->files); char *ret_json = NULL; + char *task_id = NULL; rc = seaf_repo_manager_post_multi_files (seaf->repo_mgr, fsm->repo_id, parent_dir, @@ -1044,6 +1055,7 @@ upload_ajax_cb(evhtp_request_t *req, void *arg) fsm->user, 0, &ret_json, + fsm->need_idx_progress ? &task_id : NULL, &error); if (abs_path) g_free (abs_path); @@ -1059,7 +1071,12 @@ upload_ajax_cb(evhtp_request_t *req, void *arg) goto error; } - evbuffer_add (req->buffer_out, ret_json, strlen(ret_json)); + if (task_id) { + evbuffer_add (req->buffer_out, task_id, strlen(task_id)); + g_free (task_id); + } else { + evbuffer_add (req->buffer_out, ret_json, strlen(ret_json)); + } g_free (ret_json); // send_success_reply (req); @@ -1703,8 +1720,10 @@ upload_finish_cb (evhtp_request_t *req, void *arg) } g_free (fsm->tmp_file); - for (ptr = fsm->tmp_files; ptr; ptr = ptr->next) - g_unlink ((char *)(ptr->data)); + if (!fsm->need_idx_progress) { + for (ptr = fsm->tmp_files; ptr; ptr = ptr->next) + g_unlink ((char *)(ptr->data)); + } string_list_free (fsm->tmp_files); string_list_free (fsm->filenames); string_list_free (fsm->files); @@ -2246,6 +2265,9 @@ upload_headers_cb (evhtp_request_t *req, evhtp_headers_t *hdr, void *arg) fsm->line = evbuffer_new (); fsm->form_kvs = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free); + const char *need_idx_progress = evhtp_kv_find (req->uri->query, "need_idx_progress"); + if (g_strcmp0(need_idx_progress, "true") == 0) + fsm->need_idx_progress = TRUE; if (progress_id != NULL) { progress = g_new0 (Progress, 1); @@ -2286,6 +2308,29 @@ err: return EVHTP_RES_OK; } +static void +idx_progress_cb(evhtp_request_t *req, void *arg) +{ + const char *progress_id; + + progress_id = evhtp_kv_find (req->uri->query, "task_id"); + if (!progress_id) { + seaf_debug ("[get pg] Index task id not found in url.\n"); + send_error_reply (req, EVHTP_RES_BADREQ, "task id not found"); + return; + } + char *progress_info = index_blocks_mgr_query_progress (seaf->index_blocks_mgr, + progress_id, NULL); + if (!progress_info) { + send_error_reply (req, EVHTP_RES_NOTFOUND, "Failed to get index progress"); + return; + } + evbuffer_add (req->buffer_out, progress_info, strlen(progress_info)); + send_success_reply (req); + + g_free (progress_info); +} + static void upload_progress_cb(evhtp_request_t *req, void *arg) { @@ -2379,6 +2424,8 @@ upload_file_init (evhtp_t *htp, const char *http_temp_dir) evhtp_set_regex_cb (htp, "^/upload_progress.*", upload_progress_cb, NULL); + evhtp_set_regex_cb (htp, "^/idx_progress.*", idx_progress_cb, NULL); + upload_progress = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free); pthread_mutex_init (&pg_lock, NULL);