mirror of
				https://github.com/haiwen/seahub.git
				synced 2025-10-26 14:41:45 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			122 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			122 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python
 | |
| #coding: utf-8
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import logging
 | |
| from threading import Thread
 | |
| import queue
 | |
| import rados
 | |
| 
 | |
| from seafobj.objstore_factory import SeafObjStoreFactory
 | |
| 
 | |
| logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
 | |
| 
 | |
| class Worker(Thread):
 | |
|     def __init__(self, do_work, task_queue):
 | |
|         Thread.__init__(self)
 | |
|         self.do_work = do_work
 | |
|         self.task_queue = task_queue
 | |
| 
 | |
|     def run(self):
 | |
|         while True:
 | |
|             try:
 | |
|                 task = self.task_queue.get()
 | |
|                 if task is None:
 | |
|                     break
 | |
|                 self.do_work(task)
 | |
|             except Exception as e:
 | |
|                 logging.warning('Failed to execute task: %s' % e)
 | |
|             finally:
 | |
|                 self.task_queue.task_done()
 | |
| 
 | |
| class ThreadPool(object):
 | |
|     def __init__(self, do_work, nworker=20):
 | |
|         self.do_work = do_work
 | |
|         self.nworker = nworker
 | |
|         self.task_queue = queue.Queue()
 | |
| 
 | |
|     def start(self):
 | |
|         for i in range(self.nworker):
 | |
|             Worker(self.do_work, self.task_queue).start()
 | |
| 
 | |
|     def put_task(self, task):
 | |
|         self.task_queue.put(task)
 | |
| 
 | |
|     def join(self):
 | |
|         self.task_queue.join()
 | |
|         # notify all thread to stop
 | |
|         for i in range(self.nworker):
 | |
|             self.task_queue.put(None)
 | |
| 
 | |
| class Task(object):
 | |
|     def __init__(self, repo_id, repo_version, obj_id):
 | |
|         self.repo_id = repo_id
 | |
|         self.repo_version = repo_version
 | |
|         self.obj_id = obj_id
 | |
| 
 | |
| class ObjMigrateWorker(Thread):
 | |
|     def __init__(self, orig_obj_factory, dest_obj_factory, dtype):
 | |
|         Thread.__init__(self)
 | |
|         self.dtype = dtype
 | |
|         self.orig_store = orig_obj_factory.get_obj_store(dtype)
 | |
|         self.dest_store = dest_obj_factory.get_obj_store(dtype)
 | |
|         self.thread_pool = ThreadPool(self.do_work)
 | |
| 
 | |
|     def run(self):
 | |
|         logging.info('Start to migrate [%s] object' % self.dtype)
 | |
|         self.thread_pool.start()
 | |
|         self.migrate()
 | |
|         self.thread_pool.join()
 | |
|         logging.info('Complete migrate [%s] object' % self.dtype)
 | |
| 
 | |
|     def do_work(self, task):
 | |
|         ioctx = self.dest_store.ceph_client.ioctx_pool.get_ioctx(task.repo_id)
 | |
|         try:
 | |
|             ioctx.stat(task.obj_id)
 | |
|         except rados.ObjectNotFound:
 | |
|             try:
 | |
|                 data = self.orig_store.read_obj_raw(task.repo_id, task.repo_version, task.obj_id)
 | |
|             except Exception as e:
 | |
|                 logging.warning('[%s] Failed to read object %s from repo %s: %s' % (self.dtype, task.obj_id, task.repo_id, e))
 | |
|                 raise
 | |
| 
 | |
|             try:
 | |
|                 ioctx.write_full(task.obj_id, data)
 | |
|             except Exception as e:
 | |
|                 logging.warning('[%s] Failed to write object %s of repo %s to Ceph: %s' % (self.dtype, task.obj_id, task.repo_id, e))
 | |
|                 raise
 | |
|         except Exception as e:
 | |
|             logging.warning('[%s] Failed to stat object %s of repo %s in Ceph: %s' % (self.dtype, task.obj_id, task.repo_id, e))
 | |
|             raise
 | |
|         finally:
 | |
|             self.dest_store.ceph_client.ioctx_pool.return_ioctx(ioctx)
 | |
| 
 | |
|     def migrate(self):
 | |
|         top_path = self.orig_store.obj_dir
 | |
|         for repo_id in os.listdir(top_path):
 | |
|             repo_path = os.path.join(top_path, repo_id)
 | |
|             for spath in os.listdir(repo_path):
 | |
|                 obj_path = os.path.join(repo_path, spath)
 | |
|                 for lpath in os.listdir(obj_path):
 | |
|                     obj_id = spath + lpath
 | |
|                     task = Task(repo_id, 1, obj_id)
 | |
|                     self.thread_pool.put_task(task)
 | |
| 
 | |
| def main():
 | |
|     try:
 | |
|         fs_obj_factory = SeafObjStoreFactory()
 | |
|         os.environ['SEAFILE_CENTRAL_CONF_DIR'] = os.environ['CEPH_SEAFILE_CENTRAL_CONF_DIR']
 | |
|     except KeyError:
 | |
|         logging.warning('CEPH_SEAFILE_CENTRAL_CONF_DIR environment variable is not set.\n')
 | |
|         sys.exit()
 | |
| 
 | |
|     ceph_obj_factory = SeafObjStoreFactory()
 | |
| 
 | |
|     dtypes = ['commits', 'fs', 'blocks']
 | |
|     for dtype in dtypes:
 | |
|         ObjMigrateWorker(fs_obj_factory, ceph_obj_factory, dtype).start()
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     main()
 |