From dd3003a693cae86b6f307aca6a3c896c5b64c280 Mon Sep 17 00:00:00 2001 From: feiniks <36756310+feiniks@users.noreply.github.com> Date: Mon, 13 Jan 2025 16:58:56 +0800 Subject: [PATCH] Catch exception in thread pool when migrate repos (#7320) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Catch exception in thread pool when migrate repos * Add comment and use different exit code * Check exception only --------- Co-authored-by: 杨赫然 --- scripts/migrate-repo.py | 18 ++++++++++++++---- scripts/migrate.py | 11 +++++++---- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/scripts/migrate-repo.py b/scripts/migrate-repo.py index db5319650c..b383a292df 100644 --- a/scripts/migrate-repo.py +++ b/scripts/migrate-repo.py @@ -186,7 +186,7 @@ def migrate_repo(repo_id, orig_storage_id, dest_storage_id): sys.exit(1) for w in workers: - if w.exit_code == 1: + if w.exception: logging.warning(w.exception) api.set_repo_status (repo_id, REPO_STATUS_NORMAL) sys.exit(1) @@ -202,6 +202,7 @@ def migrate_repo(repo_id, orig_storage_id, dest_storage_id): def migrate_repos(orig_storage_id, dest_storage_id): repo_ids = get_repo_ids(orig_storage_id, dest_storage_id) + pending_repos = {} for repo_id in repo_ids: api.set_repo_status (repo_id, REPO_STATUS_READ_ONLY) dtypes = ['commits', 'fs', 'blocks'] @@ -232,10 +233,14 @@ def migrate_repos(orig_storage_id, dest_storage_id): sys.exit(1) for w in workers: - if w.exit_code == 1: + if w.exception: logging.warning(w.exception) - api.set_repo_status (repo_id, REPO_STATUS_NORMAL) - sys.exit(1) + pending_repos[repo_id] = repo_id + + if repo_id in pending_repos: + api.set_repo_status (repo_id, REPO_STATUS_NORMAL) + logging.info('The process of migrating repo [%s] is failed.\n', repo_id) + continue if api.update_repo_storage_id(repo_id, dest_storage_id) < 0: logging.warning('Failed to update repo [%s] storage_id.\n', repo_id) @@ -245,5 +250,10 @@ def migrate_repos(orig_storage_id, dest_storage_id): api.set_repo_status (repo_id, REPO_STATUS_NORMAL) logging.info('The process of migrating repo [%s] is over.\n', repo_id) + if len(pending_repos) != 0: + logging.info('The following repos were not migrated successfully and need to be migrated again:\n') + for r in pending_repos: + logging.info('%s\n', r) + if __name__ == '__main__': main(sys.argv) diff --git a/scripts/migrate.py b/scripts/migrate.py index a4b5a170ff..ee68cac6f3 100755 --- a/scripts/migrate.py +++ b/scripts/migrate.py @@ -16,10 +16,11 @@ from seafobj.objstore_factory import SeafObjStoreFactory logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) class Worker(Thread): - def __init__(self, do_work, task_queue): + def __init__(self, do_work, task_queue, pool): Thread.__init__(self) self.do_work = do_work self.task_queue = task_queue + self.pool = pool def run(self): while True: @@ -29,6 +30,7 @@ class Worker(Thread): break self.do_work(task) except Exception as e: + self.pool.exception = e logging.warning('Failed to execute task: %s' % e) finally: self.task_queue.task_done() @@ -37,11 +39,13 @@ class ThreadPool(object): def __init__(self, do_work, nworker=20): self.do_work = do_work self.nworker = nworker + # The pool's exception will be set when an exception occurs in the worker processing the migration object. + self.exception = None self.task_queue = queue.Queue(maxsize = 2000) def start(self): for i in range(self.nworker): - Worker(self.do_work, self.task_queue).start() + Worker(self.do_work, self.task_queue, self).start() def put_task(self, task): self.task_queue.put(task) @@ -72,7 +76,6 @@ class ObjMigrateWorker(Thread): self.dest_objs = {} self.object_list_file_path = '' self.fd = None - self.exit_code = 0 self.exception = None self.decrypt = decrypt @@ -80,7 +83,6 @@ class ObjMigrateWorker(Thread): try: self._run() except Exception as e: - self.exit_code = 1 self.exception = e def _run(self): @@ -133,6 +135,7 @@ class ObjMigrateWorker(Thread): self.thread_pool.start() self.migrate() self.thread_pool.join() + self.exception = self.thread_pool.exception if self.object_list_file_path: self.fd.close() logging.info('Complete migrate [%s] object' % self.dtype)