1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-10-26 14:41:45 +00:00
Files
seahub/scripts/check_init_admin.py

369 lines
11 KiB
Python
Raw Normal View History

2021-12-03 15:35:16 +08:00
#coding: UTF-8
'''This script would check if there is admin, and prompt the user to create a new one if non exist'''
import json
import sys
import os
import time
import re
import shutil
import glob
import subprocess
import hashlib
import getpass
import uuid
import warnings
from configparser import ConfigParser
from seaserv import ccnet_api
try:
import readline # pylint: disable=W0611
except ImportError:
pass
SERVER_MANUAL_HTTP = 'https://download.seafile.com/published/seafile-manual/home.md'
class Utils(object):
'''Groups all helper functions here'''
@staticmethod
def welcome():
'''Show welcome message'''
welcome_msg = '''\
-----------------------------------------------------------------
This script will guide you to setup your seafile server using MySQL.
Make sure you have read seafile server manual at
%s
Press ENTER to continue
-----------------------------------------------------------------''' % SERVER_MANUAL_HTTP
print(welcome_msg)
input()
@staticmethod
def highlight(content):
'''Add ANSI color to content to get it highlighted on terminal'''
return '\x1b[33m%s\x1b[m' % content
@staticmethod
def info(msg):
print(msg)
@staticmethod
def error(msg):
'''Print error and exit'''
print()
print('Error: ' + msg)
sys.exit(1)
@staticmethod
def run_argv(argv, cwd=None, env=None, suppress_stdout=False, suppress_stderr=False):
'''Run a program and wait it to finish, and return its exit code. The
standard output of this program is supressed.
'''
with open(os.devnull, 'w') as devnull:
if suppress_stdout:
stdout = devnull
else:
stdout = sys.stdout
if suppress_stderr:
stderr = devnull
else:
stderr = sys.stderr
proc = subprocess.Popen(argv,
cwd=cwd,
stdout=stdout,
stderr=stderr,
env=env)
return proc.wait()
@staticmethod
def run(cmdline, cwd=None, env=None, suppress_stdout=False, suppress_stderr=False):
'''Like run_argv but specify a command line string instead of argv'''
with open(os.devnull, 'w') as devnull:
if suppress_stdout:
stdout = devnull
else:
stdout = sys.stdout
if suppress_stderr:
stderr = devnull
else:
stderr = sys.stderr
proc = subprocess.Popen(cmdline,
cwd=cwd,
stdout=stdout,
stderr=stderr,
env=env,
shell=True)
return proc.wait()
@staticmethod
def prepend_env_value(name, value, env=None, seperator=':'):
'''prepend a new value to a list'''
if env is None:
env = os.environ
try:
current_value = env[name]
except KeyError:
current_value = ''
new_value = value
if current_value:
new_value += seperator + current_value
env[name] = new_value
@staticmethod
def must_mkdir(path):
'''Create a directory, exit on failure'''
try:
os.mkdir(path)
except OSError as e:
Utils.error('failed to create directory %s:%s' % (path, e))
@staticmethod
def must_copy(src, dst):
'''Copy src to dst, exit on failure'''
try:
shutil.copy(src, dst)
except Exception as e:
Utils.error('failed to copy %s to %s: %s' % (src, dst, e))
@staticmethod
def find_in_path(prog):
if 'win32' in sys.platform:
sep = ';'
else:
sep = ':'
dirs = os.environ['PATH'].split(sep)
for d in dirs:
d = d.strip()
if d == '':
continue
path = os.path.join(d, prog)
if os.path.exists(path):
return path
return None
@staticmethod
def get_python_executable():
'''Return the python executable. This should be the PYTHON environment
variable which is set in setup-seafile-mysql.sh
'''
return os.environ['PYTHON']
@staticmethod
def read_config(fn):
'''Return a case sensitive ConfigParser by reading the file "fn"'''
cp = ConfigParser()
cp.optionxform = str
cp.read(fn)
return cp
@staticmethod
def write_config(cp, fn):
'''Return a case sensitive ConfigParser by reading the file "fn"'''
with open(fn, 'w') as fp:
cp.write(fp)
@staticmethod
def ask_question(desc,
key=None,
note=None,
default=None,
validate=None,
yes_or_no=False,
password=False):
'''Ask a question, return the answer.
@desc description, e.g. "What is the port of ccnet?"
@key a name to represent the target of the question, e.g. "port for
ccnet server"
@note additional information for the question, e.g. "Must be a valid
port number"
@default the default value of the question. If the default value is
not None, when the user enter nothing and press [ENTER], the default
value would be returned
@validate a function that takes the user input as the only parameter
and validate it. It should return a validated value, or throws an
"InvalidAnswer" exception if the input is not valid.
@yes_or_no If true, the user must answer "yes" or "no", and a boolean
value would be returned
@password If true, the user input would not be echoed to the
console
'''
assert key or yes_or_no
# Format description
print()
if note:
desc += '\n' + note
desc += '\n'
if yes_or_no:
desc += '[ yes or no ]'
else:
if default:
desc += '[ default "%s" ]' % default
else:
desc += '[ %s ]' % key
desc += ' '
while True:
# prompt for user input
if password:
answer = getpass.getpass(desc).strip()
else:
answer = input(desc).strip()
# No user input: use default
if not answer:
if default:
answer = default
else:
continue
# Have user input: validate answer
if yes_or_no:
if answer not in ['yes', 'no']:
print(Utils.highlight('\nPlease answer yes or no\n'))
continue
else:
return answer == 'yes'
else:
if validate:
try:
return validate(answer)
except InvalidAnswer as e:
print(Utils.highlight('\n%s\n' % e))
continue
else:
return answer
@staticmethod
def validate_port(port):
try:
port = int(port)
except ValueError:
raise InvalidAnswer('%s is not a valid port' % Utils.highlight(port))
if port <= 0 or port > 65535:
raise InvalidAnswer('%s is not a valid port' % Utils.highlight(port))
return port
class InvalidAnswer(Exception):
def __init__(self, msg):
Exception.__init__(self)
self.msg = msg
def __str__(self):
return self.msg
### END of Utils
####################
def need_create_admin():
users = ccnet_api.get_emailusers('DB', 0, 1)
return len(users) == 0
def create_admin(email, passwd):
if ccnet_api.add_emailuser(email, passwd, 1, 1) < 0:
raise Exception('failed to create admin')
else:
print('\n\n')
print('----------------------------------------')
print('Successfully created seafile admin')
print('----------------------------------------')
print('\n\n')
def ask_admin_email():
print()
print('----------------------------------------')
print('It\'s the first time you start the seafile server. Now let\'s create the admin account')
print('----------------------------------------')
def validate(email):
# whitespace is not allowed
if re.match(r'[\s]', email):
raise InvalidAnswer('%s is not a valid email address' % Utils.highlight(email))
# must be a valid email address
if not re.match(r'^.+@.*\..+$', email):
raise InvalidAnswer('%s is not a valid email address' % Utils.highlight(email))
return email
key = 'admin email'
question = 'What is the ' + Utils.highlight('email') + ' for the admin account?'
return Utils.ask_question(question,
key=key,
validate=validate)
def ask_admin_password():
def validate(password):
key = 'admin password again'
question = 'Enter the ' + Utils.highlight('password again:')
password_again = Utils.ask_question(question,
key=key,
password=True)
if password_again != password:
raise InvalidAnswer('password mismatch')
return password
key = 'admin password'
question = 'What is the ' + Utils.highlight('password') + ' for the admin account?'
return Utils.ask_question(question,
key=key,
password=True,
validate=validate)
def main():
if not need_create_admin():
return
password_file = os.path.join(os.environ['SEAFILE_CENTRAL_CONF_DIR'], 'admin.txt')
if os.path.exists(password_file):
with open(password_file, 'r') as fp:
pwinfo = json.load(fp)
email = pwinfo['email']
passwd = pwinfo['password']
os.unlink(password_file)
else:
email = ask_admin_email()
passwd = ask_admin_password()
create_admin(email, passwd)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('\n\n\n')
print(Utils.highlight('Aborted.'))
print()
sys.exit(1)
except Exception as e:
print()
print(Utils.highlight('Error happened during creating seafile admin.'))
print()