1
0
mirror of https://github.com/haiwen/seafile-server.git synced 2025-04-29 11:54:45 +00:00
seafile-server/scripts/check_init_admin.py

369 lines
11 KiB
Python
Raw Normal View History

2016-08-10 06:53:33 +00:00
#coding: UTF-8
'''This script would check if there is admin, and prompt the user to create a new one if non exist'''
import json
2016-08-10 06:53:33 +00:00
import sys
import os
import time
import re
import shutil
import glob
import subprocess
import hashlib
import getpass
import uuid
import warnings
2019-06-28 05:30:31 +00:00
from configparser import ConfigParser
2016-08-10 06:53:33 +00:00
2020-11-20 07:37:03 +00:00
from seaserv import ccnet_api
2016-08-10 06:53:33 +00:00
try:
import readline # pylint: disable=W0611
except ImportError:
pass
2019-12-03 03:52:17 +00:00
SERVER_MANUAL_HTTP = 'https://download.seafile.com/published/seafile-manual/home.md'
2016-08-10 06:53:33 +00:00
class Utils(object):
'''Groups all helper functions here'''
@staticmethod
def welcome():
'''Show welcome message'''
welcome_msg = '''\
-----------------------------------------------------------------
This script will guide you to setup your seafile server using MySQL.
Make sure you have read seafile server manual at
%s
Press ENTER to continue
-----------------------------------------------------------------''' % SERVER_MANUAL_HTTP
2019-06-28 05:30:31 +00:00
print(welcome_msg)
input()
2016-08-10 06:53:33 +00:00
@staticmethod
def highlight(content):
'''Add ANSI color to content to get it highlighted on terminal'''
return '\x1b[33m%s\x1b[m' % content
@staticmethod
def info(msg):
2019-06-28 05:30:31 +00:00
print(msg)
2016-08-10 06:53:33 +00:00
@staticmethod
def error(msg):
'''Print error and exit'''
2019-06-28 05:30:31 +00:00
print()
print('Error: ' + msg)
2016-08-10 06:53:33 +00:00
sys.exit(1)
@staticmethod
def run_argv(argv, cwd=None, env=None, suppress_stdout=False, suppress_stderr=False):
'''Run a program and wait it to finish, and return its exit code. The
standard output of this program is supressed.
'''
with open(os.devnull, 'w') as devnull:
if suppress_stdout:
stdout = devnull
else:
stdout = sys.stdout
if suppress_stderr:
stderr = devnull
else:
stderr = sys.stderr
proc = subprocess.Popen(argv,
cwd=cwd,
stdout=stdout,
stderr=stderr,
env=env)
return proc.wait()
@staticmethod
def run(cmdline, cwd=None, env=None, suppress_stdout=False, suppress_stderr=False):
'''Like run_argv but specify a command line string instead of argv'''
with open(os.devnull, 'w') as devnull:
if suppress_stdout:
stdout = devnull
else:
stdout = sys.stdout
if suppress_stderr:
stderr = devnull
else:
stderr = sys.stderr
proc = subprocess.Popen(cmdline,
cwd=cwd,
stdout=stdout,
stderr=stderr,
env=env,
shell=True)
return proc.wait()
@staticmethod
def prepend_env_value(name, value, env=None, seperator=':'):
'''prepend a new value to a list'''
if env is None:
env = os.environ
try:
current_value = env[name]
except KeyError:
current_value = ''
new_value = value
if current_value:
new_value += seperator + current_value
env[name] = new_value
@staticmethod
def must_mkdir(path):
'''Create a directory, exit on failure'''
try:
os.mkdir(path)
2019-06-28 05:30:31 +00:00
except OSError as e:
2016-08-10 06:53:33 +00:00
Utils.error('failed to create directory %s:%s' % (path, e))
@staticmethod
def must_copy(src, dst):
'''Copy src to dst, exit on failure'''
try:
shutil.copy(src, dst)
2019-06-28 05:30:31 +00:00
except Exception as e:
2016-08-10 06:53:33 +00:00
Utils.error('failed to copy %s to %s: %s' % (src, dst, e))
@staticmethod
def find_in_path(prog):
if 'win32' in sys.platform:
sep = ';'
else:
sep = ':'
dirs = os.environ['PATH'].split(sep)
for d in dirs:
d = d.strip()
if d == '':
continue
path = os.path.join(d, prog)
if os.path.exists(path):
return path
return None
@staticmethod
def get_python_executable():
'''Return the python executable. This should be the PYTHON environment
variable which is set in setup-seafile-mysql.sh
'''
return os.environ['PYTHON']
@staticmethod
def read_config(fn):
'''Return a case sensitive ConfigParser by reading the file "fn"'''
cp = ConfigParser()
cp.optionxform = str
cp.read(fn)
return cp
@staticmethod
def write_config(cp, fn):
'''Return a case sensitive ConfigParser by reading the file "fn"'''
with open(fn, 'w') as fp:
cp.write(fp)
@staticmethod
def ask_question(desc,
key=None,
note=None,
default=None,
validate=None,
yes_or_no=False,
password=False):
'''Ask a question, return the answer.
@desc description, e.g. "What is the port of ccnet?"
@key a name to represent the target of the question, e.g. "port for
ccnet server"
@note additional information for the question, e.g. "Must be a valid
port number"
@default the default value of the question. If the default value is
not None, when the user enter nothing and press [ENTER], the default
value would be returned
@validate a function that takes the user input as the only parameter
and validate it. It should return a validated value, or throws an
"InvalidAnswer" exception if the input is not valid.
@yes_or_no If true, the user must answer "yes" or "no", and a boolean
value would be returned
@password If true, the user input would not be echoed to the
console
'''
assert key or yes_or_no
# Format description
2019-06-28 05:30:31 +00:00
print()
2016-08-10 06:53:33 +00:00
if note:
desc += '\n' + note
desc += '\n'
if yes_or_no:
desc += '[ yes or no ]'
else:
if default:
desc += '[ default "%s" ]' % default
else:
desc += '[ %s ]' % key
desc += ' '
while True:
# prompt for user input
if password:
answer = getpass.getpass(desc).strip()
else:
2019-06-28 05:30:31 +00:00
answer = input(desc).strip()
2016-08-10 06:53:33 +00:00
# No user input: use default
if not answer:
if default:
answer = default
else:
continue
# Have user input: validate answer
if yes_or_no:
if answer not in ['yes', 'no']:
2019-06-28 05:30:31 +00:00
print(Utils.highlight('\nPlease answer yes or no\n'))
2016-08-10 06:53:33 +00:00
continue
else:
return answer == 'yes'
else:
if validate:
try:
return validate(answer)
2019-06-28 05:30:31 +00:00
except InvalidAnswer as e:
print(Utils.highlight('\n%s\n' % e))
2016-08-10 06:53:33 +00:00
continue
else:
return answer
@staticmethod
def validate_port(port):
try:
port = int(port)
except ValueError:
raise InvalidAnswer('%s is not a valid port' % Utils.highlight(port))
if port <= 0 or port > 65535:
raise InvalidAnswer('%s is not a valid port' % Utils.highlight(port))
return port
class InvalidAnswer(Exception):
def __init__(self, msg):
Exception.__init__(self)
self.msg = msg
def __str__(self):
return self.msg
### END of Utils
####################
def need_create_admin():
2020-11-20 07:37:03 +00:00
users = ccnet_api.get_emailusers('DB', 0, 1)
2016-08-10 06:53:33 +00:00
return len(users) == 0
def create_admin(email, passwd):
2020-11-20 07:37:03 +00:00
if ccnet_api.add_emailuser(email, passwd, 1, 1) < 0:
2016-08-10 06:53:33 +00:00
raise Exception('failed to create admin')
else:
2019-06-28 05:30:31 +00:00
print('\n\n')
print('----------------------------------------')
print('Successfully created seafile admin')
print('----------------------------------------')
print('\n\n')
2016-08-10 06:53:33 +00:00
def ask_admin_email():
2019-06-28 05:30:31 +00:00
print()
print('----------------------------------------')
print('It\'s the first time you start the seafile server. Now let\'s create the admin account')
print('----------------------------------------')
2016-08-10 06:53:33 +00:00
def validate(email):
# whitespace is not allowed
if re.match(r'[\s]', email):
raise InvalidAnswer('%s is not a valid email address' % Utils.highlight(email))
# must be a valid email address
if not re.match(r'^.+@.*\..+$', email):
raise InvalidAnswer('%s is not a valid email address' % Utils.highlight(email))
return email
key = 'admin email'
question = 'What is the ' + Utils.highlight('email') + ' for the admin account?'
return Utils.ask_question(question,
key=key,
validate=validate)
def ask_admin_password():
def validate(password):
key = 'admin password again'
question = 'Enter the ' + Utils.highlight('password again:')
password_again = Utils.ask_question(question,
key=key,
password=True)
if password_again != password:
raise InvalidAnswer('password mismatch')
return password
key = 'admin password'
question = 'What is the ' + Utils.highlight('password') + ' for the admin account?'
return Utils.ask_question(question,
key=key,
password=True,
validate=validate)
def main():
if not need_create_admin():
return
password_file = os.path.join(os.environ['SEAFILE_CENTRAL_CONF_DIR'], 'admin.txt')
if os.path.exists(password_file):
with open(password_file, 'r') as fp:
pwinfo = json.load(fp)
email = pwinfo['email']
passwd = pwinfo['password']
os.unlink(password_file)
else:
email = ask_admin_email()
passwd = ask_admin_password()
2016-08-10 06:53:33 +00:00
create_admin(email, passwd)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
2019-06-28 05:30:31 +00:00
print('\n\n\n')
print(Utils.highlight('Aborted.'))
print()
2016-08-10 06:53:33 +00:00
sys.exit(1)
2019-06-28 05:30:31 +00:00
except Exception as e:
print()
print(Utils.highlight('Error happened during creating seafile admin.'))
print()