1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-04-27 02:51:00 +00:00
seahub/scripts/migrate.py
feiniks dd3003a693
Catch exception in thread pool when migrate repos (#7320)
* Catch exception in thread pool when migrate repos

* Add comment and use different exit code

* Check exception only

---------

Co-authored-by: 杨赫然 <heran.yang@seafile.com>
2025-01-13 16:58:56 +08:00

247 lines
8.9 KiB
Python
Executable File

#!/usr/bin/env python3
#coding: utf-8
import os
import re
import sys
import random
import logging
import queue
import threading
import argparse
from threading import Thread
from uuid import UUID
from seafobj.objstore_factory import SeafObjStoreFactory
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
class Worker(Thread):
def __init__(self, do_work, task_queue, pool):
Thread.__init__(self)
self.do_work = do_work
self.task_queue = task_queue
self.pool = pool
def run(self):
while True:
try:
task = self.task_queue.get()
if task is None:
break
self.do_work(task)
except Exception as e:
self.pool.exception = e
logging.warning('Failed to execute task: %s' % e)
finally:
self.task_queue.task_done()
class ThreadPool(object):
def __init__(self, do_work, nworker=20):
self.do_work = do_work
self.nworker = nworker
# The pool's exception will be set when an exception occurs in the worker processing the migration object.
self.exception = None
self.task_queue = queue.Queue(maxsize = 2000)
def start(self):
for i in range(self.nworker):
Worker(self.do_work, self.task_queue, self).start()
def put_task(self, task):
self.task_queue.put(task)
def join(self):
self.task_queue.join()
# notify all thread to stop
for i in range(self.nworker):
self.task_queue.put(None)
class Task(object):
def __init__(self, repo_id, repo_version, obj_id):
self.repo_id = repo_id
self.repo_version = repo_version
self.obj_id = obj_id
class ObjMigrateWorker(Thread):
def __init__(self, orig_store, dest_store, dtype, repo_id = None, decrypt = False):
Thread.__init__(self)
self.lock = threading.Lock()
self.dtype = dtype
self.orig_store = orig_store
self.dest_store = dest_store
self.repo_id = repo_id
self.thread_pool = ThreadPool(self.do_work)
self.write_count = 0
self.fetch_count = 0
self.dest_objs = {}
self.object_list_file_path = ''
self.fd = None
self.exception = None
self.decrypt = decrypt
def run(self):
try:
self._run()
except Exception as e:
self.exception = e
def _run(self):
if 'OBJECT_LIST_FILE_PATH' in os.environ:
if self.repo_id:
self.object_list_file_path = '.'.join(['_'.join([os.environ['OBJECT_LIST_FILE_PATH'], self.repo_id]), self.dtype])
else:
self.object_list_file_path = '.'.join([os.environ['OBJECT_LIST_FILE_PATH'], self.dtype])
if self.object_list_file_path and \
os.path.exists(self.object_list_file_path) and \
os.path.getsize(self.object_list_file_path) > 0:
logging.info('Start to load [%s] destination object from file' % self.dtype)
with open(self.object_list_file_path, 'r') as f:
for line in f:
obj = line.rstrip('\n').split('/', 1)
if self.invalid_obj(obj):
continue
self.fetch_count += 1
if obj[0] in self.dest_objs:
self.dest_objs[obj[0]].add(obj[1])
else:
self.dest_objs[obj[0]] = set()
self.dest_objs[obj[0]].add(obj[1])
else:
logging.info('Start to fetch [%s] object from destination' % self.dtype)
if self.object_list_file_path:
f = open(self.object_list_file_path, 'a')
for obj in self.dest_store.list_objs(self.repo_id):
if self.invalid_obj(obj):
continue
self.fetch_count += 1
if obj[0] in self.dest_objs:
self.dest_objs[obj[0]].add(obj[1])
else:
self.dest_objs[obj[0]] = set()
self.dest_objs[obj[0]].add(obj[1])
if self.object_list_file_path:
f.write('/'.join(obj[:2]) + '\n')
if self.fetch_count % 100 == 0:
f.flush()
if self.object_list_file_path:
f.close()
logging.info('[%s] [%d] objects exist in destination' % (self.dtype, self.fetch_count))
if self.object_list_file_path:
self.fd = open(self.object_list_file_path, 'a')
logging.info('Start to migrate [%s] object' % self.dtype)
self.thread_pool.start()
self.migrate()
self.thread_pool.join()
self.exception = self.thread_pool.exception
if self.object_list_file_path:
self.fd.close()
logging.info('Complete migrate [%s] object' % self.dtype)
def do_work(self, task):
try:
exists = False
if task.repo_id in self.dest_objs:
if task.obj_id in self.dest_objs[task.repo_id]:
exists = True
except Exception as e:
logging.warning('[%s] Failed to check object %s existence from repo %s: %s' % (self.dtype, task.obj_id, task.repo_id, e))
raise
if not exists:
try:
if self.decrypt:
data = self.orig_store.read_decrypted(task.repo_id, task.repo_version, task.obj_id)
else:
data = self.orig_store.read_obj_raw(task.repo_id, task.repo_version, task.obj_id)
except Exception as e:
logging.warning('[%s] Failed to read object %s from repo %s: %s' % (self.dtype, task.obj_id, task.repo_id, e))
raise
try:
self.dest_store.write_obj(data, task.repo_id, task.obj_id)
self.write_count += 1
if self.write_count % 100 == 0:
logging.info('[%s] task: %s objects written to destination.', self.dtype, self.write_count)
if self.object_list_file_path:
with self.lock:
self.fd.write('/'.join([task.repo_id, task.obj_id]) + '\n')
if self.write_count % 100 == 0:
self.fd.flush()
except Exception as e:
logging.warning('[%s] Failed to write object %s from repo %s: %s' % (self.dtype, task.obj_id, task.repo_id, e))
raise
def put_task(self, objs):
if self.dest_store.get_name() != "filesystem storage backend":
random.shuffle(objs)
for obj in objs:
repo_id,obj_id=obj.split('/')
task = Task(repo_id, 1, obj_id)
self.thread_pool.put_task(task)
def migrate(self):
try:
obj_list = self.orig_store.list_objs(self.repo_id)
except Exception as e:
logging.warning('[%s] Failed to list all objects: %s' % (self.dtype, e))
raise
objs = []
for obj in obj_list:
if self.invalid_obj(obj):
continue
repo_id = obj[0]
obj_id = obj[1]
objs.append(repo_id+"/"+obj_id)
if len(objs) >= 1000000:
self.put_task(objs)
objs = []
self.put_task(objs)
def invalid_obj(self, obj):
if len(obj) < 2:
return True
try:
UUID(obj[0], version = 4)
except ValueError:
return True
if len(obj[1]) != 40 or not re.match('\A[0-9a-f]+\Z', obj[1]):
return True
return False
def main(argv):
parser = argparse.ArgumentParser()
parser.add_argument('--decrypt', action='store_true', help='decrypt data from source storage and write to destination in plain text')
parser.add_argument('config_dir', help='The dir where the destination configuration file exists')
args = parser.parse_args()
decrypt = args.decrypt
try:
orig_obj_factory = SeafObjStoreFactory()
os.environ['SEAFILE_CENTRAL_CONF_DIR'] = os.environ['DEST_SEAFILE_CENTRAL_CONF_DIR']
except KeyError:
logging.warning('DEST_SEAFILE_CENTRAL_CONF_DIR environment variable is not set.\n')
sys.exit()
dest_obj_factory = SeafObjStoreFactory()
dtypes = ['commits', 'fs', 'blocks']
for dtype in dtypes:
orig_store = orig_obj_factory.get_obj_store(dtype)
dest_store = dest_obj_factory.get_obj_store(dtype)
if orig_store.get_name() == dest_store.get_name() and \
orig_store.get_container_name() == dest_store.get_container_name():
logging.warning('%s does not support migration between identical storage' % (orig_store.get_name()))
sys.exit()
ObjMigrateWorker(orig_store, dest_store, dtype, decrypt=decrypt).start()
if __name__ == '__main__':
main(sys.argv)