mirror of
				https://github.com/haiwen/seahub.git
				synced 2025-10-22 11:43:33 +00:00 
			
		
		
		
	* Catch exception in thread pool when migrate repos * Add comment and use different exit code * Check exception only --------- Co-authored-by: 杨赫然 <heran.yang@seafile.com>
		
			
				
	
	
		
			247 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			247 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| #coding: utf-8
 | |
| 
 | |
| import os
 | |
| import re
 | |
| import sys
 | |
| import random
 | |
| import logging
 | |
| import queue
 | |
| import threading
 | |
| import argparse
 | |
| from threading import Thread
 | |
| from uuid import UUID
 | |
| from seafobj.objstore_factory import SeafObjStoreFactory
 | |
| 
 | |
| logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
 | |
| 
 | |
| class Worker(Thread):
 | |
|     def __init__(self, do_work, task_queue, pool):
 | |
|         Thread.__init__(self)
 | |
|         self.do_work = do_work
 | |
|         self.task_queue = task_queue
 | |
|         self.pool = pool
 | |
| 
 | |
|     def run(self):
 | |
|         while True:
 | |
|             try:
 | |
|                 task = self.task_queue.get()
 | |
|                 if task is None:
 | |
|                     break
 | |
|                 self.do_work(task)
 | |
|             except Exception as e:
 | |
|                 self.pool.exception = e
 | |
|                 logging.warning('Failed to execute task: %s' % e)
 | |
|             finally:
 | |
|                 self.task_queue.task_done()
 | |
| 
 | |
| class ThreadPool(object):
 | |
|     def __init__(self, do_work, nworker=20):
 | |
|         self.do_work = do_work
 | |
|         self.nworker = nworker
 | |
|         # The pool's exception will be set when an exception occurs in the worker processing the migration object.
 | |
|         self.exception = None
 | |
|         self.task_queue = queue.Queue(maxsize = 2000)
 | |
| 
 | |
|     def start(self):
 | |
|         for i in range(self.nworker):
 | |
|             Worker(self.do_work, self.task_queue, self).start()
 | |
| 
 | |
|     def put_task(self, task):
 | |
|         self.task_queue.put(task)
 | |
| 
 | |
|     def join(self):
 | |
|         self.task_queue.join()
 | |
|         # notify all thread to stop
 | |
|         for i in range(self.nworker):
 | |
|             self.task_queue.put(None)
 | |
| 
 | |
| class Task(object):
 | |
|     def __init__(self, repo_id, repo_version, obj_id):
 | |
|         self.repo_id = repo_id
 | |
|         self.repo_version = repo_version
 | |
|         self.obj_id = obj_id
 | |
| 
 | |
| class ObjMigrateWorker(Thread):
 | |
|     def __init__(self, orig_store, dest_store, dtype, repo_id = None, decrypt = False):
 | |
|         Thread.__init__(self)
 | |
|         self.lock = threading.Lock()
 | |
|         self.dtype = dtype
 | |
|         self.orig_store = orig_store
 | |
|         self.dest_store = dest_store
 | |
|         self.repo_id = repo_id
 | |
|         self.thread_pool = ThreadPool(self.do_work)
 | |
|         self.write_count = 0
 | |
|         self.fetch_count = 0
 | |
|         self.dest_objs = {}
 | |
|         self.object_list_file_path = ''
 | |
|         self.fd = None
 | |
|         self.exception = None
 | |
|         self.decrypt = decrypt
 | |
|     
 | |
|     def run(self):
 | |
|         try:
 | |
|             self._run()
 | |
|         except Exception as e:
 | |
|             self.exception = e
 | |
| 
 | |
|     def _run(self):
 | |
|         if 'OBJECT_LIST_FILE_PATH' in os.environ:
 | |
|             if self.repo_id:
 | |
|                 self.object_list_file_path = '.'.join(['_'.join([os.environ['OBJECT_LIST_FILE_PATH'], self.repo_id]), self.dtype])
 | |
|             else:
 | |
|                 self.object_list_file_path = '.'.join([os.environ['OBJECT_LIST_FILE_PATH'], self.dtype])
 | |
| 
 | |
|         if self.object_list_file_path and \
 | |
|         os.path.exists(self.object_list_file_path) and \
 | |
|         os.path.getsize(self.object_list_file_path) > 0:
 | |
|             logging.info('Start to load [%s] destination object from file' % self.dtype)
 | |
|             with open(self.object_list_file_path, 'r') as f:
 | |
|                 for line in f:
 | |
|                     obj = line.rstrip('\n').split('/', 1)
 | |
|                     if self.invalid_obj(obj):
 | |
|                         continue
 | |
|                     self.fetch_count += 1
 | |
|                     if obj[0] in self.dest_objs:
 | |
|                         self.dest_objs[obj[0]].add(obj[1])
 | |
|                     else:
 | |
|                         self.dest_objs[obj[0]] = set()
 | |
|                         self.dest_objs[obj[0]].add(obj[1])
 | |
| 
 | |
|         else:
 | |
|             logging.info('Start to fetch [%s] object from destination' % self.dtype)
 | |
|             if self.object_list_file_path:
 | |
|                 f = open(self.object_list_file_path, 'a')
 | |
|             for obj in self.dest_store.list_objs(self.repo_id):
 | |
|                 if self.invalid_obj(obj):
 | |
|                     continue
 | |
|                 self.fetch_count += 1
 | |
|                 if obj[0] in self.dest_objs:
 | |
|                     self.dest_objs[obj[0]].add(obj[1])
 | |
|                 else:
 | |
|                     self.dest_objs[obj[0]] = set()
 | |
|                     self.dest_objs[obj[0]].add(obj[1])
 | |
|                 if self.object_list_file_path:
 | |
|                     f.write('/'.join(obj[:2]) + '\n')
 | |
|                     if self.fetch_count % 100 == 0:
 | |
|                         f.flush()
 | |
|             if self.object_list_file_path:
 | |
|                 f.close()
 | |
|         logging.info('[%s] [%d] objects exist in destination' % (self.dtype, self.fetch_count))
 | |
| 
 | |
|         if self.object_list_file_path:
 | |
|             self.fd = open(self.object_list_file_path, 'a')
 | |
|         logging.info('Start to migrate [%s] object' % self.dtype)
 | |
|         self.thread_pool.start()
 | |
|         self.migrate()
 | |
|         self.thread_pool.join()
 | |
|         self.exception = self.thread_pool.exception
 | |
|         if self.object_list_file_path:
 | |
|             self.fd.close()
 | |
|         logging.info('Complete migrate [%s] object' % self.dtype)
 | |
| 
 | |
|     def do_work(self, task):
 | |
|         try:
 | |
|             exists = False
 | |
|             if task.repo_id in self.dest_objs:
 | |
|                 if task.obj_id in self.dest_objs[task.repo_id]:
 | |
|                     exists = True
 | |
| 
 | |
|         except Exception as e:
 | |
|             logging.warning('[%s] Failed to check object %s existence from repo %s: %s' % (self.dtype, task.obj_id, task.repo_id, e))
 | |
|             raise
 | |
| 
 | |
|         if not exists:
 | |
|             try:
 | |
|                 if self.decrypt:
 | |
|                     data = self.orig_store.read_decrypted(task.repo_id, task.repo_version, task.obj_id)
 | |
|                 else:
 | |
|                     data = self.orig_store.read_obj_raw(task.repo_id, task.repo_version, task.obj_id)
 | |
|             except Exception as e:
 | |
|                 logging.warning('[%s] Failed to read object %s from repo %s: %s' % (self.dtype, task.obj_id, task.repo_id, e))
 | |
|                 raise
 | |
| 
 | |
|             try:
 | |
|                 self.dest_store.write_obj(data, task.repo_id, task.obj_id)
 | |
|                 self.write_count += 1
 | |
|                 if self.write_count % 100 == 0:
 | |
|                     logging.info('[%s] task: %s objects written to destination.', self.dtype, self.write_count)
 | |
| 
 | |
|                 if self.object_list_file_path:
 | |
|                     with self.lock:
 | |
|                         self.fd.write('/'.join([task.repo_id, task.obj_id]) + '\n')
 | |
|                         if self.write_count % 100 == 0:
 | |
|                             self.fd.flush()
 | |
|             except Exception as e:
 | |
|                 logging.warning('[%s] Failed to write object %s from repo %s: %s' % (self.dtype, task.obj_id, task.repo_id, e))
 | |
|                 raise
 | |
| 
 | |
|     def put_task(self, objs):
 | |
|         if self.dest_store.get_name() != "filesystem storage backend":
 | |
|             random.shuffle(objs)
 | |
|         for obj in objs:
 | |
|             repo_id,obj_id=obj.split('/')
 | |
|             task = Task(repo_id, 1, obj_id)
 | |
|             self.thread_pool.put_task(task)
 | |
| 
 | |
|     def migrate(self):
 | |
|         try:
 | |
|             obj_list = self.orig_store.list_objs(self.repo_id)
 | |
|         except Exception as e:
 | |
|             logging.warning('[%s] Failed to list all objects: %s' % (self.dtype, e))
 | |
|             raise
 | |
| 
 | |
|         objs = []
 | |
|         for obj in obj_list:
 | |
|             if self.invalid_obj(obj):
 | |
|                 continue
 | |
|             repo_id = obj[0]
 | |
|             obj_id = obj[1]
 | |
|             objs.append(repo_id+"/"+obj_id)
 | |
|             if len(objs) >= 1000000:
 | |
|                 self.put_task(objs)
 | |
|                 objs = []
 | |
| 
 | |
|         self.put_task(objs)
 | |
| 
 | |
|     def invalid_obj(self, obj):
 | |
|         if len(obj) < 2:
 | |
|             return True
 | |
|         try:
 | |
|             UUID(obj[0], version = 4)
 | |
|         except ValueError:
 | |
|             return True
 | |
|         if len(obj[1]) != 40 or not re.match('\A[0-9a-f]+\Z', obj[1]):
 | |
|             return True
 | |
|         return False
 | |
| 
 | |
| def main(argv):
 | |
|     parser = argparse.ArgumentParser()
 | |
|     parser.add_argument('--decrypt', action='store_true', help='decrypt data from source storage and write to destination in plain text')
 | |
|     parser.add_argument('config_dir', help='The dir where the destination configuration file exists')
 | |
|     args = parser.parse_args()
 | |
| 
 | |
|     decrypt = args.decrypt
 | |
| 
 | |
|     try:
 | |
|         orig_obj_factory = SeafObjStoreFactory()
 | |
|         os.environ['SEAFILE_CENTRAL_CONF_DIR'] = os.environ['DEST_SEAFILE_CENTRAL_CONF_DIR']
 | |
|     except KeyError:
 | |
|         logging.warning('DEST_SEAFILE_CENTRAL_CONF_DIR environment variable is not set.\n')
 | |
|         sys.exit()
 | |
| 
 | |
|     dest_obj_factory = SeafObjStoreFactory()
 | |
| 
 | |
|     dtypes = ['commits', 'fs', 'blocks']
 | |
|     for dtype in dtypes:
 | |
|         orig_store = orig_obj_factory.get_obj_store(dtype)
 | |
|         dest_store = dest_obj_factory.get_obj_store(dtype)
 | |
|         if orig_store.get_name() == dest_store.get_name() and \
 | |
|            orig_store.get_container_name() == dest_store.get_container_name():
 | |
|             logging.warning('%s does not support migration between identical storage' % (orig_store.get_name()))
 | |
|             sys.exit()
 | |
|         ObjMigrateWorker(orig_store, dest_store, dtype, decrypt=decrypt).start()
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     main(sys.argv)
 |