diff --git a/scripts/migrate-to-ceph.sh b/scripts/migrate-to-ceph.sh new file mode 100755 index 0000000..d49684f --- /dev/null +++ b/scripts/migrate-to-ceph.sh @@ -0,0 +1,90 @@ +#!/bin/bash + +echo "" + +SCRIPT=$(readlink -f "$0") +INSTALLPATH=$(dirname "${SCRIPT}") +TOPDIR=$(dirname "${INSTALLPATH}") +default_ccnet_conf_dir=${TOPDIR}/ccnet +default_seafile_data_dir=${TOPDIR}/seafile-data +default_conf_dir=${TOPDIR}/conf + +migrate_to_ceph=${INSTALLPATH}/seafobj_migrate.py + +script_name=$0 +function usage () { + echo "usage : " + echo "$(basename ${script_name}) ceph_seafile_central_conf_dir" + echo "" +} + +function check_python_executable() { + if [[ "$PYTHON" != "" && -x $PYTHON ]]; then + return 0 + fi + + if which python2.7 2>/dev/null 1>&2; then + PYTHON=python2.7 + elif which python27 2>/dev/null 1>&2; then + PYTHON=python27 + elif which python2.6 2>/dev/null 1>&2; then + PYTHON=python2.6 + elif which python26 2>/dev/null 1>&2; then + PYTHON=python26 + else + echo + echo "Can't find a python executable of version 2.6 or above in PATH" + echo "Install python 2.6+ before continue." + echo "Or if you installed it in a non-standard PATH, set the PYTHON enviroment varirable to it" + echo + exit 1 + fi +} + +function validate_ccnet_conf_dir () { + if [[ ! -d ${default_ccnet_conf_dir} ]]; then + echo "Error: there is no ccnet config directory." + echo "Have you run setup-seafile.sh before this?" + echo "" + exit -1; + fi +} + +function do_migrate_to_ceph () { + validate_ccnet_conf_dir; + + export CCNET_CONF_DIR=${default_ccnet_conf_dir} + export SEAFILE_CONF_DIR=${default_seafile_data_dir} + export SEAFILE_CENTRAL_CONF_DIR=${default_conf_dir} + export CEPH_SEAFILE_CENTRAL_CONF_DIR=${ceph_seafile_central_conf_dir} + + export PYTHONPATH=${INSTALLPATH}/seafile/lib/python2.6/site-packages:${INSTALLPATH}/seafile/lib64/python2.6/site-packages:${INSTALLPATH}/seahub/thirdpart:$PYTHONPATH + export PYTHONPATH=${INSTALLPATH}/seafile/lib/python2.7/site-packages:${INSTALLPATH}/seafile/lib64/python2.7/site-packages:$PYTHONPATH + + $PYTHON ${migrate_to_ceph} +} + +check_python_executable; + +if [ $# -gt 0 ]; +then + for param in $@; + do + if [ ${param} = "-h" -o ${param} = "--help" ]; + then + usage; + exit 1; + fi + done +fi + +if [ $# -ne 1 ]; +then + usage; + exit 1; +fi + +ceph_seafile_central_conf_dir="$1" +do_migrate_to_ceph; + +echo "Done." diff --git a/scripts/seafobj_migrate.py b/scripts/seafobj_migrate.py new file mode 100755 index 0000000..3c1f8df --- /dev/null +++ b/scripts/seafobj_migrate.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python +#coding: utf-8 + +import os +import sys +import logging +from threading import Thread +import Queue +import rados + +from seafobj.objstore_factory import SeafObjStoreFactory + +logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) + +class Worker(Thread): + def __init__(self, do_work, task_queue): + Thread.__init__(self) + self.do_work = do_work + self.task_queue = task_queue + + def run(self): + while True: + try: + task = self.task_queue.get() + if task is None: + break + self.do_work(task) + except Exception as e: + logging.warning('Failed to execute task: %s' % e) + finally: + self.task_queue.task_done() + +class ThreadPool(object): + def __init__(self, do_work, nworker=20): + self.do_work = do_work + self.nworker = nworker + self.task_queue = Queue.Queue() + + def start(self): + for i in xrange(self.nworker): + Worker(self.do_work, self.task_queue).start() + + def put_task(self, task): + self.task_queue.put(task) + + def join(self): + self.task_queue.join() + # notify all thread to stop + for i in xrange(self.nworker): + self.task_queue.put(None) + +class Task(object): + def __init__(self, repo_id, repo_version, obj_id): + self.repo_id = repo_id + self.repo_version = repo_version + self.obj_id = obj_id + +class ObjMigrateWorker(Thread): + def __init__(self, orig_obj_factory, dest_obj_factory, dtype): + Thread.__init__(self) + self.dtype = dtype + self.orig_store = orig_obj_factory.get_obj_store(dtype) + self.dest_store = dest_obj_factory.get_obj_store(dtype) + self.thread_pool = ThreadPool(self.do_work) + + def run(self): + logging.info('Start to migrate [%s] object' % self.dtype) + self.thread_pool.start() + self.migrate() + self.thread_pool.join() + logging.info('Complete migrate [%s] object' % self.dtype) + + def do_work(self, task): + ioctx = self.dest_store.ceph_client.ioctx_pool.get_ioctx(task.repo_id) + try: + ioctx.stat(task.obj_id) + except rados.ObjectNotFound: + try: + data = self.orig_store.read_obj_raw(task.repo_id, task.repo_version, task.obj_id) + except Exception as e: + logging.warning('[%s] Failed to read object %s from repo %s: %s' % (self.dtype, task.obj_id, task.repo_id, e)) + raise + + try: + ioctx.write_full(task.obj_id, data) + except Exception as e: + logging.warning('[%s] Failed to write object %s of repo %s to Ceph: %s' % (self.dtype, task.obj_id, task.repo_id, e)) + raise + except Exception as e: + logging.warning('[%s] Failed to stat object %s of repo %s in Ceph: %s' % (self.dtype, task.obj_id, task.repo_id, e)) + raise + finally: + self.dest_store.ceph_client.ioctx_pool.return_ioctx(ioctx) + + def migrate(self): + top_path = self.orig_store.obj_dir + for repo_id in os.listdir(top_path): + repo_path = os.path.join(top_path, repo_id) + for spath in os.listdir(repo_path): + obj_path = os.path.join(repo_path, spath) + for lpath in os.listdir(obj_path): + obj_id = spath + lpath + task = Task(repo_id, 1, obj_id) + self.thread_pool.put_task(task) + +def main(): + try: + fs_obj_factory = SeafObjStoreFactory() + os.environ['SEAFILE_CENTRAL_CONF_DIR'] = os.environ['CEPH_SEAFILE_CENTRAL_CONF_DIR'] + except KeyError: + logging.warning('CEPH_SEAFILE_CENTRAL_CONF_DIR environment variable is not set.\n') + sys.exit() + + ceph_obj_factory = SeafObjStoreFactory() + + dtypes = ['commits', 'fs', 'blocks'] + for dtype in dtypes: + ObjMigrateWorker(fs_obj_factory, ceph_obj_factory, dtype).start() + +if __name__ == '__main__': + main()