mirror of
https://github.com/haiwen/seafile-server.git
synced 2025-09-02 16:04:26 +00:00
[script] Add ceph migrate script.
This commit is contained in:
90
scripts/migrate-to-ceph.sh
Executable file
90
scripts/migrate-to-ceph.sh
Executable file
@@ -0,0 +1,90 @@
|
||||
#!/bin/bash
|
||||
|
||||
echo ""
|
||||
|
||||
SCRIPT=$(readlink -f "$0")
|
||||
INSTALLPATH=$(dirname "${SCRIPT}")
|
||||
TOPDIR=$(dirname "${INSTALLPATH}")
|
||||
default_ccnet_conf_dir=${TOPDIR}/ccnet
|
||||
default_seafile_data_dir=${TOPDIR}/seafile-data
|
||||
default_conf_dir=${TOPDIR}/conf
|
||||
|
||||
migrate_to_ceph=${INSTALLPATH}/seafobj_migrate.py
|
||||
|
||||
script_name=$0
|
||||
function usage () {
|
||||
echo "usage : "
|
||||
echo "$(basename ${script_name}) ceph_seafile_central_conf_dir"
|
||||
echo ""
|
||||
}
|
||||
|
||||
function check_python_executable() {
|
||||
if [[ "$PYTHON" != "" && -x $PYTHON ]]; then
|
||||
return 0
|
||||
fi
|
||||
|
||||
if which python2.7 2>/dev/null 1>&2; then
|
||||
PYTHON=python2.7
|
||||
elif which python27 2>/dev/null 1>&2; then
|
||||
PYTHON=python27
|
||||
elif which python2.6 2>/dev/null 1>&2; then
|
||||
PYTHON=python2.6
|
||||
elif which python26 2>/dev/null 1>&2; then
|
||||
PYTHON=python26
|
||||
else
|
||||
echo
|
||||
echo "Can't find a python executable of version 2.6 or above in PATH"
|
||||
echo "Install python 2.6+ before continue."
|
||||
echo "Or if you installed it in a non-standard PATH, set the PYTHON enviroment varirable to it"
|
||||
echo
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
function validate_ccnet_conf_dir () {
|
||||
if [[ ! -d ${default_ccnet_conf_dir} ]]; then
|
||||
echo "Error: there is no ccnet config directory."
|
||||
echo "Have you run setup-seafile.sh before this?"
|
||||
echo ""
|
||||
exit -1;
|
||||
fi
|
||||
}
|
||||
|
||||
function do_migrate_to_ceph () {
|
||||
validate_ccnet_conf_dir;
|
||||
|
||||
export CCNET_CONF_DIR=${default_ccnet_conf_dir}
|
||||
export SEAFILE_CONF_DIR=${default_seafile_data_dir}
|
||||
export SEAFILE_CENTRAL_CONF_DIR=${default_conf_dir}
|
||||
export CEPH_SEAFILE_CENTRAL_CONF_DIR=${ceph_seafile_central_conf_dir}
|
||||
|
||||
export PYTHONPATH=${INSTALLPATH}/seafile/lib/python2.6/site-packages:${INSTALLPATH}/seafile/lib64/python2.6/site-packages:${INSTALLPATH}/seahub/thirdpart:$PYTHONPATH
|
||||
export PYTHONPATH=${INSTALLPATH}/seafile/lib/python2.7/site-packages:${INSTALLPATH}/seafile/lib64/python2.7/site-packages:$PYTHONPATH
|
||||
|
||||
$PYTHON ${migrate_to_ceph}
|
||||
}
|
||||
|
||||
check_python_executable;
|
||||
|
||||
if [ $# -gt 0 ];
|
||||
then
|
||||
for param in $@;
|
||||
do
|
||||
if [ ${param} = "-h" -o ${param} = "--help" ];
|
||||
then
|
||||
usage;
|
||||
exit 1;
|
||||
fi
|
||||
done
|
||||
fi
|
||||
|
||||
if [ $# -ne 1 ];
|
||||
then
|
||||
usage;
|
||||
exit 1;
|
||||
fi
|
||||
|
||||
ceph_seafile_central_conf_dir="$1"
|
||||
do_migrate_to_ceph;
|
||||
|
||||
echo "Done."
|
121
scripts/seafobj_migrate.py
Executable file
121
scripts/seafobj_migrate.py
Executable file
@@ -0,0 +1,121 @@
|
||||
#!/usr/bin/env python
|
||||
#coding: utf-8
|
||||
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
from threading import Thread
|
||||
import Queue
|
||||
import rados
|
||||
|
||||
from seafobj.objstore_factory import SeafObjStoreFactory
|
||||
|
||||
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
|
||||
|
||||
class Worker(Thread):
|
||||
def __init__(self, do_work, task_queue):
|
||||
Thread.__init__(self)
|
||||
self.do_work = do_work
|
||||
self.task_queue = task_queue
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
try:
|
||||
task = self.task_queue.get()
|
||||
if task is None:
|
||||
break
|
||||
self.do_work(task)
|
||||
except Exception as e:
|
||||
logging.warning('Failed to execute task: %s' % e)
|
||||
finally:
|
||||
self.task_queue.task_done()
|
||||
|
||||
class ThreadPool(object):
|
||||
def __init__(self, do_work, nworker=20):
|
||||
self.do_work = do_work
|
||||
self.nworker = nworker
|
||||
self.task_queue = Queue.Queue()
|
||||
|
||||
def start(self):
|
||||
for i in xrange(self.nworker):
|
||||
Worker(self.do_work, self.task_queue).start()
|
||||
|
||||
def put_task(self, task):
|
||||
self.task_queue.put(task)
|
||||
|
||||
def join(self):
|
||||
self.task_queue.join()
|
||||
# notify all thread to stop
|
||||
for i in xrange(self.nworker):
|
||||
self.task_queue.put(None)
|
||||
|
||||
class Task(object):
|
||||
def __init__(self, repo_id, repo_version, obj_id):
|
||||
self.repo_id = repo_id
|
||||
self.repo_version = repo_version
|
||||
self.obj_id = obj_id
|
||||
|
||||
class ObjMigrateWorker(Thread):
|
||||
def __init__(self, orig_obj_factory, dest_obj_factory, dtype):
|
||||
Thread.__init__(self)
|
||||
self.dtype = dtype
|
||||
self.orig_store = orig_obj_factory.get_obj_store(dtype)
|
||||
self.dest_store = dest_obj_factory.get_obj_store(dtype)
|
||||
self.thread_pool = ThreadPool(self.do_work)
|
||||
|
||||
def run(self):
|
||||
logging.info('Start to migrate [%s] object' % self.dtype)
|
||||
self.thread_pool.start()
|
||||
self.migrate()
|
||||
self.thread_pool.join()
|
||||
logging.info('Complete migrate [%s] object' % self.dtype)
|
||||
|
||||
def do_work(self, task):
|
||||
ioctx = self.dest_store.ceph_client.ioctx_pool.get_ioctx(task.repo_id)
|
||||
try:
|
||||
ioctx.stat(task.obj_id)
|
||||
except rados.ObjectNotFound:
|
||||
try:
|
||||
data = self.orig_store.read_obj_raw(task.repo_id, task.repo_version, task.obj_id)
|
||||
except Exception as e:
|
||||
logging.warning('[%s] Failed to read object %s from repo %s: %s' % (self.dtype, task.obj_id, task.repo_id, e))
|
||||
raise
|
||||
|
||||
try:
|
||||
ioctx.write_full(task.obj_id, data)
|
||||
except Exception as e:
|
||||
logging.warning('[%s] Failed to write object %s of repo %s to Ceph: %s' % (self.dtype, task.obj_id, task.repo_id, e))
|
||||
raise
|
||||
except Exception as e:
|
||||
logging.warning('[%s] Failed to stat object %s of repo %s in Ceph: %s' % (self.dtype, task.obj_id, task.repo_id, e))
|
||||
raise
|
||||
finally:
|
||||
self.dest_store.ceph_client.ioctx_pool.return_ioctx(ioctx)
|
||||
|
||||
def migrate(self):
|
||||
top_path = self.orig_store.obj_dir
|
||||
for repo_id in os.listdir(top_path):
|
||||
repo_path = os.path.join(top_path, repo_id)
|
||||
for spath in os.listdir(repo_path):
|
||||
obj_path = os.path.join(repo_path, spath)
|
||||
for lpath in os.listdir(obj_path):
|
||||
obj_id = spath + lpath
|
||||
task = Task(repo_id, 1, obj_id)
|
||||
self.thread_pool.put_task(task)
|
||||
|
||||
def main():
|
||||
try:
|
||||
fs_obj_factory = SeafObjStoreFactory()
|
||||
os.environ['SEAFILE_CENTRAL_CONF_DIR'] = os.environ['CEPH_SEAFILE_CENTRAL_CONF_DIR']
|
||||
except KeyError:
|
||||
logging.warning('CEPH_SEAFILE_CENTRAL_CONF_DIR environment variable is not set.\n')
|
||||
sys.exit()
|
||||
|
||||
ceph_obj_factory = SeafObjStoreFactory()
|
||||
|
||||
dtypes = ['commits', 'fs', 'blocks']
|
||||
for dtype in dtypes:
|
||||
ObjMigrateWorker(fs_obj_factory, ceph_obj_factory, dtype).start()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Reference in New Issue
Block a user