diff --git a/server/http-server.c b/server/http-server.c index 88ab9c9..21fe812 100644 --- a/server/http-server.c +++ b/server/http-server.c @@ -54,6 +54,9 @@ #define PERM_EXPIRE_TIME 7200 /* 2 hours */ #define VIRINFO_EXPIRE_TIME 7200 /* 2 hours */ +#define FS_ID_LIST_MAX_WORKERS 3 +#define FS_ID_LIST_TOKEN_LEN 36 + struct _HttpServer { evbase_t *evbase; evhtp_t *evhtp; @@ -69,6 +72,11 @@ struct _HttpServer { pthread_mutex_t vir_repo_info_cache_lock; event_t *reap_timer; + + GThreadPool *compute_fs_obj_id_pool; + + GHashTable *fs_obj_ids; + pthread_mutex_t fs_obj_ids_lock; }; typedef struct _HttpServer HttpServer; @@ -115,6 +123,9 @@ const char *GET_HEAD_COMMITS_MULTI_REGEX = "^/repo/head-commits-multi"; const char *COMMIT_OPER_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/commit/[\\da-z]{40}"; const char *PUT_COMMIT_INFO_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/commit/[\\da-z]{40}"; const char *GET_FS_OBJ_ID_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/fs-id-list/.*"; +const char *START_FS_OBJ_ID_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/start-fs-id-list/.*"; +const char *QUERY_FS_OBJ_ID_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/query-fs-id-list/.*"; +const char *RETRIEVE_FS_OBJ_ID_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/retrieve-fs-id-list/.*"; const char *BLOCK_OPER_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/block/[\\da-z]{40}"; const char *POST_CHECK_FS_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/check-fs"; const char *POST_CHECK_BLOCK_REGEX = "^/repo/[\\da-z]{8}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{4}-[\\da-z]{12}/check-blocks"; @@ -1531,6 +1542,293 @@ out: seaf_repo_unref (repo); } +typedef struct ComputeObjTask { + HttpServer *htp_server; + char *token; + char *repo_id; + char *client_head; + char *server_head; + gboolean dir_only; +} ComputeObjTask; + +typedef struct CalObjResult { + GList *list; + gboolean done; +} CalObjResult; + +static void +free_compute_obj_task(ComputeObjTask *task) +{ + if (!task) + return; + + if (task->token) + g_free(task->token); + if (task->repo_id) + g_free(task->repo_id); + if (task->client_head) + g_free(task->client_head); + if (task->server_head) + g_free(task->server_head); + g_free(task); +} + +static void +free_obj_cal_result (gpointer data) +{ + CalObjResult *result = (CalObjResult *)data; + if (!result) + return; + + if (result->list) + g_list_free (result->list); + + g_free(result); +} + +static void +compute_fs_obj_id (gpointer ptask, gpointer ppara) +{ + SeafRepo *repo = NULL; + ComputeObjTask *task = ptask; + const char *client_head = task->client_head; + const char *server_head = task->server_head; + char *repo_id = task->repo_id; + gboolean dir_only = task->dir_only; + HttpServer *htp_server = task->htp_server; + CalObjResult *result = NULL; + + pthread_mutex_lock (&htp_server->fs_obj_ids_lock); + result = g_hash_table_lookup (htp_server->fs_obj_ids, task->token); + pthread_mutex_unlock (&htp_server->fs_obj_ids_lock); + if (!result) { + goto out; + } + + repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id); + if (!repo) { + seaf_warning ("Failed to find repo %.8s.\n", repo_id); + goto out; + } + + if (calculate_send_object_list (repo, server_head, client_head, dir_only, &result->list) < 0) { + pthread_mutex_lock (&htp_server->fs_obj_ids_lock); + g_hash_table_remove (htp_server->fs_obj_ids, task->token); + pthread_mutex_unlock (&htp_server->fs_obj_ids_lock); + goto out; + } + + result->done = TRUE; +out: + seaf_repo_unref (repo); + free_compute_obj_task(task); +} + +static void +start_fs_obj_id_cb (evhtp_request_t *req, void *arg) +{ + HttpServer *htp_server = arg; + char **parts; + char *repo_id; + gboolean dir_only = FALSE; + json_t *obj; + + const char *server_head = evhtp_kv_find (req->uri->query, "server-head"); + if (server_head == NULL || !is_object_id_valid (server_head)) { + char *error = "Invalid server-head parameter.\n"; + evbuffer_add (req->buffer_out, error, strlen (error)); + evhtp_send_reply (req, EVHTP_RES_BADREQ); + return; + } + + const char *client_head = evhtp_kv_find (req->uri->query, "client-head"); + if (client_head && !is_object_id_valid (client_head)) { + char *error = "Invalid client-head parameter.\n"; + evbuffer_add (req->buffer_out, error, strlen (error)); + evhtp_send_reply (req, EVHTP_RES_BADREQ); + return; + } + + const char *dir_only_arg = evhtp_kv_find (req->uri->query, "dir-only"); + if (dir_only_arg) + dir_only = TRUE; + + parts = g_strsplit (req->uri->path->full + 1, "/", 0); + repo_id = parts[1]; + + int token_status = validate_token (htp_server, req, repo_id, NULL, FALSE); + if (token_status != EVHTP_RES_OK) { + evhtp_send_reply (req, token_status); + goto out; + } + + char uuid[37]; + char *new_token; + gen_uuid_inplace (uuid); + new_token = g_strndup(uuid, FS_ID_LIST_TOKEN_LEN); + + CalObjResult *result = g_new0(CalObjResult, 1); + if (!result) { + evhtp_send_reply (req, EVHTP_RES_SERVERR); + goto out; + } + result->done = FALSE; + + ComputeObjTask *task = g_new0 (ComputeObjTask, 1); + if (!task) { + evhtp_send_reply (req, EVHTP_RES_SERVERR); + goto out; + } + + task->token = new_token; + task->dir_only = dir_only; + task->htp_server = htp_server; + task->repo_id = g_strdup(repo_id); + task->client_head = g_strdup(client_head); + task->server_head = g_strdup(server_head); + + pthread_mutex_lock (&htp_server->fs_obj_ids_lock); + g_hash_table_insert (htp_server->fs_obj_ids, g_strdup(task->token), result); + pthread_mutex_unlock (&htp_server->fs_obj_ids_lock); + g_thread_pool_push (htp_server->compute_fs_obj_id_pool, task, NULL); + obj = json_object (); + json_object_set_new (obj, "token", json_string (new_token)); + + char *json_str = json_dumps (obj, JSON_COMPACT); + evbuffer_add (req->buffer_out, json_str, strlen(json_str)); + evhtp_send_reply (req, EVHTP_RES_OK); + + g_free (json_str); + json_decref (obj); +out: + g_strfreev (parts); +} + +static void +query_fs_obj_id_cb (evhtp_request_t *req, void *arg) +{ + json_t *obj; + const char *token = NULL; + CalObjResult *result = NULL; + char **parts; + char *repo_id = NULL; + HttpServer *htp_server = (HttpServer *)arg; + + parts = g_strsplit (req->uri->path->full + 1, "/", 0); + repo_id = parts[1]; + + int token_status = validate_token (htp_server, req, repo_id, NULL, FALSE); + if (token_status != EVHTP_RES_OK) { + evhtp_send_reply (req, token_status); + goto out; + } + + token = evhtp_kv_find (req->uri->query, "token"); + if (!token || strlen(token)!=FS_ID_LIST_TOKEN_LEN) { + evhtp_send_reply (req, EVHTP_RES_BADREQ); + goto out; + } + + obj = json_object (); + + pthread_mutex_lock (&htp_server->fs_obj_ids_lock); + result = g_hash_table_lookup (htp_server->fs_obj_ids, token); + if (!result) { + pthread_mutex_unlock (&htp_server->fs_obj_ids_lock); + evhtp_send_reply (req, EVHTP_RES_NOTFOUND); + goto out; + } else { + if (!result->done) { + json_object_set_new (obj, "success", json_false()); + } else { + json_object_set_new (obj, "success", json_true()); + } + } + pthread_mutex_unlock (&htp_server->fs_obj_ids_lock); + + json_object_set_new (obj, "token", json_string (token)); + + char *json_str = json_dumps (obj, JSON_COMPACT); + evbuffer_add (req->buffer_out, json_str, strlen(json_str)); + evhtp_send_reply (req, EVHTP_RES_OK); + + g_free (json_str); + +out: + if (obj) + json_decref (obj); + g_strfreev (parts); + return; +} + +static void +retrieve_fs_obj_id_cb (evhtp_request_t *req, void *arg) +{ + char **parts; + const char *token = NULL; + char *repo_id = NULL; + GList *list = NULL; + CalObjResult *result = NULL; + HttpServer *htp_server = (HttpServer *)arg; + + parts = g_strsplit (req->uri->path->full + 1, "/", 0); + repo_id = parts[1]; + + int token_status = validate_token (htp_server, req, repo_id, NULL, FALSE); + if (token_status != EVHTP_RES_OK) { + evhtp_send_reply (req, token_status); + goto out; + } + + token = evhtp_kv_find (req->uri->query, "token"); + if (!token || strlen(token)!=FS_ID_LIST_TOKEN_LEN) { + evhtp_send_reply (req, EVHTP_RES_BADREQ); + goto out; + } + + pthread_mutex_lock (&htp_server->fs_obj_ids_lock); + result = g_hash_table_lookup (htp_server->fs_obj_ids, token); + if (!result) { + pthread_mutex_unlock (&htp_server->fs_obj_ids_lock); + evhtp_send_reply (req, EVHTP_RES_NOTFOUND); + + return; + } + if (!result->done) { + pthread_mutex_unlock (&htp_server->fs_obj_ids_lock); + + char *error = "The cauculation task is not completed.\n"; + evbuffer_add (req->buffer_out, error, strlen(error)); + evhtp_send_reply (req, EVHTP_RES_BADREQ); + return; + } + list = result->list; + pthread_mutex_unlock (&htp_server->fs_obj_ids_lock); + + GList *ptr; + json_t *obj_array = json_array (); + + for (ptr = list; ptr; ptr = ptr->next) { + json_array_append_new (obj_array, json_string (ptr->data)); + g_free (ptr->data); + } + + pthread_mutex_lock (&htp_server->fs_obj_ids_lock); + g_hash_table_remove (htp_server->fs_obj_ids, token); + pthread_mutex_unlock (&htp_server->fs_obj_ids_lock); + + char *obj_list = json_dumps (obj_array, JSON_COMPACT); + evbuffer_add (req->buffer_out, obj_list, strlen (obj_list)); + evhtp_send_reply (req, EVHTP_RES_OK); + + g_free (obj_list); + json_decref (obj_array); + +out: + g_strfreev (parts); + return; +} + static void get_block_cb (evhtp_request_t *req, void *arg) { @@ -2346,6 +2644,18 @@ http_request_init (HttpServerStruct *server) GET_FS_OBJ_ID_REGEX, get_fs_obj_id_cb, priv); + evhtp_set_regex_cb (priv->evhtp, + START_FS_OBJ_ID_REGEX, start_fs_obj_id_cb, + priv); + + evhtp_set_regex_cb (priv->evhtp, + QUERY_FS_OBJ_ID_REGEX, query_fs_obj_id_cb, + priv); + + evhtp_set_regex_cb (priv->evhtp, + RETRIEVE_FS_OBJ_ID_REGEX, retrieve_fs_obj_id_cb, + priv); + evhtp_set_regex_cb (priv->evhtp, BLOCK_OPER_REGEX, block_oper_cb, priv); @@ -2530,6 +2840,13 @@ seaf_http_server_new (struct _SeafileSession *session) server->http_temp_dir = g_build_filename (session->seaf_dir, "httptemp", NULL); + priv->compute_fs_obj_id_pool = g_thread_pool_new (compute_fs_obj_id, NULL, + FS_ID_LIST_MAX_WORKERS, FALSE, NULL); + + priv->fs_obj_ids = g_hash_table_new_full (g_str_hash, g_str_equal, + g_free, free_obj_cal_result); + pthread_mutex_init (&priv->fs_obj_ids_lock, NULL); + server->seaf_session = session; server->priv = priv;