1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-04-27 02:51:00 +00:00

Catch exception in thread pool when migrate repos (#7320)

* Catch exception in thread pool when migrate repos

* Add comment and use different exit code

* Check exception only

---------

Co-authored-by: 杨赫然 <heran.yang@seafile.com>
This commit is contained in:
feiniks 2025-01-13 16:58:56 +08:00 committed by GitHub
parent de75571ae6
commit dd3003a693
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 21 additions and 8 deletions

View File

@ -186,7 +186,7 @@ def migrate_repo(repo_id, orig_storage_id, dest_storage_id):
sys.exit(1)
for w in workers:
if w.exit_code == 1:
if w.exception:
logging.warning(w.exception)
api.set_repo_status (repo_id, REPO_STATUS_NORMAL)
sys.exit(1)
@ -202,6 +202,7 @@ def migrate_repo(repo_id, orig_storage_id, dest_storage_id):
def migrate_repos(orig_storage_id, dest_storage_id):
repo_ids = get_repo_ids(orig_storage_id, dest_storage_id)
pending_repos = {}
for repo_id in repo_ids:
api.set_repo_status (repo_id, REPO_STATUS_READ_ONLY)
dtypes = ['commits', 'fs', 'blocks']
@ -232,10 +233,14 @@ def migrate_repos(orig_storage_id, dest_storage_id):
sys.exit(1)
for w in workers:
if w.exit_code == 1:
if w.exception:
logging.warning(w.exception)
api.set_repo_status (repo_id, REPO_STATUS_NORMAL)
sys.exit(1)
pending_repos[repo_id] = repo_id
if repo_id in pending_repos:
api.set_repo_status (repo_id, REPO_STATUS_NORMAL)
logging.info('The process of migrating repo [%s] is failed.\n', repo_id)
continue
if api.update_repo_storage_id(repo_id, dest_storage_id) < 0:
logging.warning('Failed to update repo [%s] storage_id.\n', repo_id)
@ -245,5 +250,10 @@ def migrate_repos(orig_storage_id, dest_storage_id):
api.set_repo_status (repo_id, REPO_STATUS_NORMAL)
logging.info('The process of migrating repo [%s] is over.\n', repo_id)
if len(pending_repos) != 0:
logging.info('The following repos were not migrated successfully and need to be migrated again:\n')
for r in pending_repos:
logging.info('%s\n', r)
if __name__ == '__main__':
main(sys.argv)

View File

@ -16,10 +16,11 @@ from seafobj.objstore_factory import SeafObjStoreFactory
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
class Worker(Thread):
def __init__(self, do_work, task_queue):
def __init__(self, do_work, task_queue, pool):
Thread.__init__(self)
self.do_work = do_work
self.task_queue = task_queue
self.pool = pool
def run(self):
while True:
@ -29,6 +30,7 @@ class Worker(Thread):
break
self.do_work(task)
except Exception as e:
self.pool.exception = e
logging.warning('Failed to execute task: %s' % e)
finally:
self.task_queue.task_done()
@ -37,11 +39,13 @@ class ThreadPool(object):
def __init__(self, do_work, nworker=20):
self.do_work = do_work
self.nworker = nworker
# The pool's exception will be set when an exception occurs in the worker processing the migration object.
self.exception = None
self.task_queue = queue.Queue(maxsize = 2000)
def start(self):
for i in range(self.nworker):
Worker(self.do_work, self.task_queue).start()
Worker(self.do_work, self.task_queue, self).start()
def put_task(self, task):
self.task_queue.put(task)
@ -72,7 +76,6 @@ class ObjMigrateWorker(Thread):
self.dest_objs = {}
self.object_list_file_path = ''
self.fd = None
self.exit_code = 0
self.exception = None
self.decrypt = decrypt
@ -80,7 +83,6 @@ class ObjMigrateWorker(Thread):
try:
self._run()
except Exception as e:
self.exit_code = 1
self.exception = e
def _run(self):
@ -133,6 +135,7 @@ class ObjMigrateWorker(Thread):
self.thread_pool.start()
self.migrate()
self.thread_pool.join()
self.exception = self.thread_pool.exception
if self.object_list_file_path:
self.fd.close()
logging.info('Complete migrate [%s] object' % self.dtype)