mirror of
				https://github.com/haiwen/seahub.git
				synced 2025-10-25 22:27:39 +00:00 
			
		
		
		
	
		
			
	
	
		
			122 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			122 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | #!/usr/bin/env python | ||
|  | #coding: utf-8 | ||
|  | 
 | ||
|  | import os | ||
|  | import sys | ||
|  | import logging | ||
|  | from threading import Thread | ||
|  | import queue | ||
|  | import rados | ||
|  | 
 | ||
|  | from seafobj.objstore_factory import SeafObjStoreFactory | ||
|  | 
 | ||
|  | logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) | ||
|  | 
 | ||
|  | class Worker(Thread): | ||
|  |     def __init__(self, do_work, task_queue): | ||
|  |         Thread.__init__(self) | ||
|  |         self.do_work = do_work | ||
|  |         self.task_queue = task_queue | ||
|  | 
 | ||
|  |     def run(self): | ||
|  |         while True: | ||
|  |             try: | ||
|  |                 task = self.task_queue.get() | ||
|  |                 if task is None: | ||
|  |                     break | ||
|  |                 self.do_work(task) | ||
|  |             except Exception as e: | ||
|  |                 logging.warning('Failed to execute task: %s' % e) | ||
|  |             finally: | ||
|  |                 self.task_queue.task_done() | ||
|  | 
 | ||
|  | class ThreadPool(object): | ||
|  |     def __init__(self, do_work, nworker=20): | ||
|  |         self.do_work = do_work | ||
|  |         self.nworker = nworker | ||
|  |         self.task_queue = queue.Queue() | ||
|  | 
 | ||
|  |     def start(self): | ||
|  |         for i in range(self.nworker): | ||
|  |             Worker(self.do_work, self.task_queue).start() | ||
|  | 
 | ||
|  |     def put_task(self, task): | ||
|  |         self.task_queue.put(task) | ||
|  | 
 | ||
|  |     def join(self): | ||
|  |         self.task_queue.join() | ||
|  |         # notify all thread to stop | ||
|  |         for i in range(self.nworker): | ||
|  |             self.task_queue.put(None) | ||
|  | 
 | ||
|  | class Task(object): | ||
|  |     def __init__(self, repo_id, repo_version, obj_id): | ||
|  |         self.repo_id = repo_id | ||
|  |         self.repo_version = repo_version | ||
|  |         self.obj_id = obj_id | ||
|  | 
 | ||
|  | class ObjMigrateWorker(Thread): | ||
|  |     def __init__(self, orig_obj_factory, dest_obj_factory, dtype): | ||
|  |         Thread.__init__(self) | ||
|  |         self.dtype = dtype | ||
|  |         self.orig_store = orig_obj_factory.get_obj_store(dtype) | ||
|  |         self.dest_store = dest_obj_factory.get_obj_store(dtype) | ||
|  |         self.thread_pool = ThreadPool(self.do_work) | ||
|  | 
 | ||
|  |     def run(self): | ||
|  |         logging.info('Start to migrate [%s] object' % self.dtype) | ||
|  |         self.thread_pool.start() | ||
|  |         self.migrate() | ||
|  |         self.thread_pool.join() | ||
|  |         logging.info('Complete migrate [%s] object' % self.dtype) | ||
|  | 
 | ||
|  |     def do_work(self, task): | ||
|  |         ioctx = self.dest_store.ceph_client.ioctx_pool.get_ioctx(task.repo_id) | ||
|  |         try: | ||
|  |             ioctx.stat(task.obj_id) | ||
|  |         except rados.ObjectNotFound: | ||
|  |             try: | ||
|  |                 data = self.orig_store.read_obj_raw(task.repo_id, task.repo_version, task.obj_id) | ||
|  |             except Exception as e: | ||
|  |                 logging.warning('[%s] Failed to read object %s from repo %s: %s' % (self.dtype, task.obj_id, task.repo_id, e)) | ||
|  |                 raise | ||
|  | 
 | ||
|  |             try: | ||
|  |                 ioctx.write_full(task.obj_id, data) | ||
|  |             except Exception as e: | ||
|  |                 logging.warning('[%s] Failed to write object %s of repo %s to Ceph: %s' % (self.dtype, task.obj_id, task.repo_id, e)) | ||
|  |                 raise | ||
|  |         except Exception as e: | ||
|  |             logging.warning('[%s] Failed to stat object %s of repo %s in Ceph: %s' % (self.dtype, task.obj_id, task.repo_id, e)) | ||
|  |             raise | ||
|  |         finally: | ||
|  |             self.dest_store.ceph_client.ioctx_pool.return_ioctx(ioctx) | ||
|  | 
 | ||
|  |     def migrate(self): | ||
|  |         top_path = self.orig_store.obj_dir | ||
|  |         for repo_id in os.listdir(top_path): | ||
|  |             repo_path = os.path.join(top_path, repo_id) | ||
|  |             for spath in os.listdir(repo_path): | ||
|  |                 obj_path = os.path.join(repo_path, spath) | ||
|  |                 for lpath in os.listdir(obj_path): | ||
|  |                     obj_id = spath + lpath | ||
|  |                     task = Task(repo_id, 1, obj_id) | ||
|  |                     self.thread_pool.put_task(task) | ||
|  | 
 | ||
|  | def main(): | ||
|  |     try: | ||
|  |         fs_obj_factory = SeafObjStoreFactory() | ||
|  |         os.environ['SEAFILE_CENTRAL_CONF_DIR'] = os.environ['CEPH_SEAFILE_CENTRAL_CONF_DIR'] | ||
|  |     except KeyError: | ||
|  |         logging.warning('CEPH_SEAFILE_CENTRAL_CONF_DIR environment variable is not set.\n') | ||
|  |         sys.exit() | ||
|  | 
 | ||
|  |     ceph_obj_factory = SeafObjStoreFactory() | ||
|  | 
 | ||
|  |     dtypes = ['commits', 'fs', 'blocks'] | ||
|  |     for dtype in dtypes: | ||
|  |         ObjMigrateWorker(fs_obj_factory, ceph_obj_factory, dtype).start() | ||
|  | 
 | ||
|  | if __name__ == '__main__': | ||
|  |     main() |