mirror of
https://github.com/haiwen/seahub.git
synced 2025-04-27 02:51:00 +00:00
122 lines
4.0 KiB
Python
Executable File
122 lines
4.0 KiB
Python
Executable File
#!/usr/bin/env python
|
|
#coding: utf-8
|
|
|
|
import os
|
|
import sys
|
|
import logging
|
|
from threading import Thread
|
|
import queue
|
|
import rados
|
|
|
|
from seafobj.objstore_factory import SeafObjStoreFactory
|
|
|
|
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
|
|
|
|
class Worker(Thread):
|
|
def __init__(self, do_work, task_queue):
|
|
Thread.__init__(self)
|
|
self.do_work = do_work
|
|
self.task_queue = task_queue
|
|
|
|
def run(self):
|
|
while True:
|
|
try:
|
|
task = self.task_queue.get()
|
|
if task is None:
|
|
break
|
|
self.do_work(task)
|
|
except Exception as e:
|
|
logging.warning('Failed to execute task: %s' % e)
|
|
finally:
|
|
self.task_queue.task_done()
|
|
|
|
class ThreadPool(object):
|
|
def __init__(self, do_work, nworker=20):
|
|
self.do_work = do_work
|
|
self.nworker = nworker
|
|
self.task_queue = queue.Queue()
|
|
|
|
def start(self):
|
|
for i in range(self.nworker):
|
|
Worker(self.do_work, self.task_queue).start()
|
|
|
|
def put_task(self, task):
|
|
self.task_queue.put(task)
|
|
|
|
def join(self):
|
|
self.task_queue.join()
|
|
# notify all thread to stop
|
|
for i in range(self.nworker):
|
|
self.task_queue.put(None)
|
|
|
|
class Task(object):
|
|
def __init__(self, repo_id, repo_version, obj_id):
|
|
self.repo_id = repo_id
|
|
self.repo_version = repo_version
|
|
self.obj_id = obj_id
|
|
|
|
class ObjMigrateWorker(Thread):
|
|
def __init__(self, orig_obj_factory, dest_obj_factory, dtype):
|
|
Thread.__init__(self)
|
|
self.dtype = dtype
|
|
self.orig_store = orig_obj_factory.get_obj_store(dtype)
|
|
self.dest_store = dest_obj_factory.get_obj_store(dtype)
|
|
self.thread_pool = ThreadPool(self.do_work)
|
|
|
|
def run(self):
|
|
logging.info('Start to migrate [%s] object' % self.dtype)
|
|
self.thread_pool.start()
|
|
self.migrate()
|
|
self.thread_pool.join()
|
|
logging.info('Complete migrate [%s] object' % self.dtype)
|
|
|
|
def do_work(self, task):
|
|
ioctx = self.dest_store.ceph_client.ioctx_pool.get_ioctx(task.repo_id)
|
|
try:
|
|
ioctx.stat(task.obj_id)
|
|
except rados.ObjectNotFound:
|
|
try:
|
|
data = self.orig_store.read_obj_raw(task.repo_id, task.repo_version, task.obj_id)
|
|
except Exception as e:
|
|
logging.warning('[%s] Failed to read object %s from repo %s: %s' % (self.dtype, task.obj_id, task.repo_id, e))
|
|
raise
|
|
|
|
try:
|
|
ioctx.write_full(task.obj_id, data)
|
|
except Exception as e:
|
|
logging.warning('[%s] Failed to write object %s of repo %s to Ceph: %s' % (self.dtype, task.obj_id, task.repo_id, e))
|
|
raise
|
|
except Exception as e:
|
|
logging.warning('[%s] Failed to stat object %s of repo %s in Ceph: %s' % (self.dtype, task.obj_id, task.repo_id, e))
|
|
raise
|
|
finally:
|
|
self.dest_store.ceph_client.ioctx_pool.return_ioctx(ioctx)
|
|
|
|
def migrate(self):
|
|
top_path = self.orig_store.obj_dir
|
|
for repo_id in os.listdir(top_path):
|
|
repo_path = os.path.join(top_path, repo_id)
|
|
for spath in os.listdir(repo_path):
|
|
obj_path = os.path.join(repo_path, spath)
|
|
for lpath in os.listdir(obj_path):
|
|
obj_id = spath + lpath
|
|
task = Task(repo_id, 1, obj_id)
|
|
self.thread_pool.put_task(task)
|
|
|
|
def main():
|
|
try:
|
|
fs_obj_factory = SeafObjStoreFactory()
|
|
os.environ['SEAFILE_CENTRAL_CONF_DIR'] = os.environ['CEPH_SEAFILE_CENTRAL_CONF_DIR']
|
|
except KeyError:
|
|
logging.warning('CEPH_SEAFILE_CENTRAL_CONF_DIR environment variable is not set.\n')
|
|
sys.exit()
|
|
|
|
ceph_obj_factory = SeafObjStoreFactory()
|
|
|
|
dtypes = ['commits', 'fs', 'blocks']
|
|
for dtype in dtypes:
|
|
ObjMigrateWorker(fs_obj_factory, ceph_obj_factory, dtype).start()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|