jumpserver/jms
老广 1fd2e782f8
1.5.7 Merge to dev (#3766)
* [Update] 暂存,优化解决不了问题

* [Update] 待续(小白)

* [Update] 修改asset user

* [Update] 计划再次更改

* [Update] 修改asset user

* [Update] 暂存与喜爱

* [Update] Add id in

* [Update] 阶段性完成ops task该做

* [Update] 修改asset user api

* [Update] 修改asset user 任务,查看认证等

* [Update] 基本完成asset user改造

* [Update] dynamic user only allow 1

* [Update] 修改asset user task

* [Update] 修改node admin user task api

* [Update] remove file header license

* [Update] 添加sftp root

* [Update] 暂存

* [Update] 暂存

* [Update] 修改翻译

* [Update] 修改系统用户改为同名后,用户名改为空

* [Update] 基本完成CAS调研

* [Update] 支持cas server

* [Update] 支持cas server

* [Update] 添加requirements

* [Update] 为方便调试添加mysql ipython到包中

* [Update] 添加huaweiyun翻译

* [Update] 增加下载session 录像

* [Update] 只有第一次通知replay离线的使用方法

* [Update] 暂存一下

* [Bugfix] 获取系统用户信息报错

* [Bugfix] 修改system user info

* [Update] 改成清理10天git status

* [Update] 修改celery日志保留时间

* [Update]修复部分pip包依赖的版本不兼容问题 (#3672)

* [Update] 修复用户更新页面会清空用户public_key的问题

* Fix broken dependencies

Co-authored-by: BaiJiangJie <32935519+BaiJiangJie@users.noreply.github.com>

* [Update] 修改获取系统用户auth info

* [Update] Remove log

* [Bugfix] 修复sftp home设置的bug

* [Update] 授权的系统用户添加sftp root

* [Update] 修改系统用户关联的用户

* [Update] 修改placeholder

* [Update] 优化获取授权的系统用户

* [Update] 修改tasks

* [Update] tree service update

* [Update] 暂存

* [Update] 基本完成用户授权树和资产树改造

* [Update] Dashbaord perf

* [update] Add huawei cloud sdk requirements

* [Updte] 优化dashboard页面

* [Update] system user auth info 添加id

* [Update] 修改系统用户serializer

* [Update] 优化api

* [Update] LDAP Test Util (#3720)

* [Update] LDAPTestUtil 1

* [Update] LDAPTestUtil 2

* [Update] LDAPTestUtil 3

* [Update] LDAPTestUtil 4

* [Update] LDAPTestUtil 5

* [Update] LDAPTestUtil 6

* [Update] LDAPTestUtil 7

* [Update] session 已添加is success,并且添加display serializer

* [Bugfix] 修复无法删除空节点的bug

* [Update] 命令记录分组织显示

* [Update] Session is_success 添加迁移文件

* [Update] 批量命令添加org_id

* [Update] 修复一些文案,修改不绑定MFA,不能ssh登录

* [Update] 修改replay api, 返回session信息

* [Update] 解决无效es导致访问命令记录页面失败的问题

* [Update] 拆分profile view

* [Update] 修改一个翻译

* [Update] 修改aysnc api框架

* [Update] 命令列表添加risk level

* [Update] 完成录像打包下载

* [Update] 更改登陆otp页面

* [Update] 修改command 存储redis_level

* [Update] 修改翻译

* [Update] 修改系统用户的用户列表字段

* [Update] 使用新logo和统一Jumpserver为JumpServer

* [Update] 优化cloud task

* [Update] 统一period task

* [Update] 统一period form serializer字段

* [Update] 修改period task

* [Update] 修改资产网关信息

* [Update] 用户授权资产树资产信息添加domain

* [Update] 修改翻译

* [Update] 测试可连接性

* 1.5.7 bai (#3764)

* [Update] 修复index页面Bug;修复测试资产用户可连接性问题;

* [Update] 修改测试资产用户可连接

* [Bugfix] 修复backends问题

* [Update] 修改marksafe依赖版本

* [Update] 修改测试资产用户可连接性

* [Update] 修改检测服务器性能时获取percent值

* [Update] 更新依赖boto3=1.12.14

Co-authored-by: Yanzhe Lee <lee.yanzhe@yanzhe.org>
Co-authored-by: BaiJiangJie <32935519+BaiJiangJie@users.noreply.github.com>
Co-authored-by: Bai <bugatti_it@163.com>
2020-03-12 16:24:38 +08:00

551 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
# coding: utf-8
import os
import subprocess
import threading
import datetime
import logging
import logging.handlers
import psutil
import time
import argparse
import sys
import shutil
import signal
from collections import defaultdict
import daemon
from daemon import pidfile
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
try:
from apps.jumpserver import const
__version__ = const.VERSION
except ImportError as e:
print("Not found __version__: {}".format(e))
print("Python is: ")
logging.info(subprocess.call('which python', shell=True))
__version__ = 'Unknown'
sys.exit(1)
try:
from apps.jumpserver.const import CONFIG
except ImportError as e:
print("Import error: {}".format(e))
print("Could not find config file, `cp config_example.yml config.yml`")
sys.exit(1)
os.environ["PYTHONIOENCODING"] = "UTF-8"
APPS_DIR = os.path.join(BASE_DIR, 'apps')
LOG_DIR = os.path.join(BASE_DIR, 'logs')
TMP_DIR = os.path.join(BASE_DIR, 'tmp')
HTTP_HOST = CONFIG.HTTP_BIND_HOST or '127.0.0.1'
HTTP_PORT = CONFIG.HTTP_LISTEN_PORT or 8080
WS_PORT = CONFIG.WS_LISTEN_PORT or 8082
DEBUG = CONFIG.DEBUG or False
LOG_LEVEL = CONFIG.LOG_LEVEL or 'INFO'
START_TIMEOUT = 40
WORKERS = 4
DAEMON = False
LOG_KEEP_DAYS = 7
logging.basicConfig(
format='%(asctime)s %(message)s', level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S'
)
EXIT_EVENT = threading.Event()
LOCK = threading.Lock()
files_preserve = []
STOP_TIMEOUT = 10
logger = logging.getLogger()
try:
os.makedirs(os.path.join(BASE_DIR, "data", "static"))
os.makedirs(os.path.join(BASE_DIR, "data", "media"))
except:
pass
def check_database_connection():
os.chdir(os.path.join(BASE_DIR, 'apps'))
for i in range(60):
logging.info("Check database connection ...")
code = subprocess.call("python manage.py showmigrations users ", shell=True)
if code == 0:
logging.info("Database connect success")
return
time.sleep(1)
logging.info("Connection database failed, exist")
sys.exit(10)
def make_migrations():
logging.info("Check database structure change ...")
os.chdir(os.path.join(BASE_DIR, 'apps'))
logging.info("Migrate model change to database ...")
subprocess.call('python3 manage.py migrate', shell=True)
def collect_static():
logging.info("Collect static files")
os.chdir(os.path.join(BASE_DIR, 'apps'))
command = 'python3 manage.py collectstatic --no-input -c &> /dev/null '
subprocess.call(command, shell=True)
logging.info("Collect static files done")
def prepare():
check_database_connection()
make_migrations()
collect_static()
def check_pid(pid):
""" Check For the existence of a unix pid. """
try:
os.kill(pid, 0)
except (OSError, ProcessLookupError):
return False
else:
return True
def get_pid_file_path(s):
return os.path.join(TMP_DIR, '{}.pid'.format(s))
def get_log_file_path(s):
return os.path.join(LOG_DIR, '{}.log'.format(s))
def get_pid_from_file(path):
if os.path.isfile(path):
with open(path) as f:
try:
return int(f.read().strip())
except ValueError:
return 0
return 0
def get_pid(s):
pid_file = get_pid_file_path(s)
return get_pid_from_file(pid_file)
def is_running(s, unlink=True):
pid_file = get_pid_file_path(s)
if os.path.isfile(pid_file):
pid = get_pid(s)
if pid == 0:
return False
elif check_pid(pid):
return True
if unlink:
os.unlink(pid_file)
return False
def parse_service(s):
all_services = [
'gunicorn', 'celery_ansible', 'celery_default',
'beat', 'flower', 'daphne',
]
if s == 'all':
return all_services
elif s == "web":
return ['gunicorn', 'flower', 'daphne']
elif s == 'ws':
return ['daphne']
elif s == "task":
return ["celery_ansible", "celery_default", "beat"]
elif s == "celery":
return ["celery_ansible", "celery_default"]
elif "," in s:
services = set()
for i in s.split(','):
services.update(parse_service(i))
return services
else:
return [s]
def get_start_gunicorn_kwargs():
print("\n- Start Gunicorn WSGI HTTP Server")
prepare()
bind = '{}:{}'.format(HTTP_HOST, HTTP_PORT)
log_format = '%(h)s %(t)s "%(r)s" %(s)s %(b)s '
cmd = [
'gunicorn', 'jumpserver.wsgi',
'-b', bind,
'-k', 'gthread',
'--threads', '10',
'-w', str(WORKERS),
'--max-requests', '4096',
'--access-logformat', log_format,
'--access-logfile', '-'
]
if DEBUG:
cmd.append('--reload')
return {'cmd': cmd, 'cwd': APPS_DIR}
def get_start_daphne_kwargs():
print("\n- Start Daphne ASGI WS Server")
cmd = [
'daphne', 'jumpserver.asgi:application',
'-b', HTTP_HOST,
'-p', str(WS_PORT),
]
return {'cmd': cmd, 'cwd': APPS_DIR}
def get_start_celery_ansible_kwargs():
print("\n- Start Celery as Distributed Task Queue: Ansible")
return get_start_worker_kwargs('ansible', 4)
def get_start_celery_default_kwargs():
print("\n- Start Celery as Distributed Task Queue: Celery")
return get_start_worker_kwargs('celery', 2)
def get_start_worker_kwargs(queue, num):
# Todo: Must set this environment, otherwise not no ansible result return
os.environ.setdefault('PYTHONOPTIMIZE', '1')
os.environ.setdefault('ANSIBLE_FORCE_COLOR', 'True')
if os.getuid() == 0:
os.environ.setdefault('C_FORCE_ROOT', '1')
server_hostname = os.environ.get("SERVER_HOSTNAME")
if not server_hostname:
server_hostname = '%h'
cmd = [
'celery', 'worker',
'-A', 'ops',
'-l', 'INFO',
'-c', str(num),
'-Q', queue,
'-n', '{}@{}'.format(queue, server_hostname)
]
return {"cmd": cmd, "cwd": APPS_DIR}
def get_start_flower_kwargs():
print("\n- Start Flower as Task Monitor")
if os.getuid() == 0:
os.environ.setdefault('C_FORCE_ROOT', '1')
cmd = [
'celery', 'flower',
'-A', 'ops',
'-l', 'INFO',
'--url_prefix=flower',
'--auto_refresh=False',
'--max_tasks=1000',
'--tasks_columns=uuid,name,args,state,received,started,runtime,worker'
]
return {"cmd": cmd, "cwd": APPS_DIR}
def get_start_beat_kwargs():
print("\n- Start Beat as Periodic Task Scheduler")
os.environ.setdefault('PYTHONOPTIMIZE', '1')
if os.getuid() == 0:
os.environ.setdefault('C_FORCE_ROOT', '1')
scheduler = "django_celery_beat.schedulers:DatabaseScheduler"
cmd = [
'celery', 'beat',
'-A', 'ops',
'-l', 'INFO',
'--scheduler', scheduler,
'--max-interval', '60'
]
return {"cmd": cmd, 'cwd': APPS_DIR}
processes = {}
def watch_services():
max_retry = 3
services_retry = defaultdict(int)
stopped_services = {}
def check_services():
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for s, p in processes.items():
print("{} Check service status: {} -> ".format(now, s), end='')
try:
p.wait(timeout=1)
except subprocess.TimeoutExpired:
pass
ok = is_running(s)
if not ok:
stopped_services[s] = ''
print("stopped with code: {}({})".format(p.returncode, p.pid))
else:
print("running at {}".format(p.pid))
stopped_services.pop(s, None)
services_retry.pop(s, None)
def retry_start_stopped_services():
for s in stopped_services:
if services_retry[s] > max_retry:
logging.info("Service start failed, exit: ", s)
EXIT_EVENT.set()
break
p = start_service(s)
logging.info("> Find {} stopped, retry {}, {}".format(
s, services_retry[s] + 1, p.pid)
)
processes[s] = p
services_retry[s] += 1
def rotate_log_if_need():
now = datetime.datetime.now()
tm = now.strftime('%H:%M')
if tm != '23:59':
return
suffix = now.strftime('%Y-%m-%d')
services = list(processes.keys())
services.append('jms')
for s in services:
log_path = get_log_file_path(s)
log_dir = os.path.dirname(log_path)
filename = os.path.basename(log_path)
pre_log_dir = os.path.join(log_dir, suffix)
if not os.path.exists(pre_log_dir):
os.mkdir(pre_log_dir)
pre_log_path = os.path.join(pre_log_dir, filename)
if os.path.isfile(log_path) and not os.path.isfile(pre_log_path):
logging.info("Rotate log file: {} => {}".format(log_path, pre_log_path))
shutil.copy(log_path, pre_log_path)
with open(log_path, 'w') as f:
pass
some_days_ago = now - datetime.timedelta(days=LOG_KEEP_DAYS)
days_ago_dir = os.path.join(LOG_DIR, some_days_ago.strftime('%Y-%m-%d'))
if os.path.exists(days_ago_dir):
logger.info("Remove old log: {}".format(days_ago_dir))
shutil.rmtree(days_ago_dir, ignore_errors=True)
while not EXIT_EVENT.is_set():
try:
with LOCK:
check_services()
retry_start_stopped_services()
rotate_log_if_need()
time.sleep(30)
except KeyboardInterrupt:
print("Start stop service")
time.sleep(1)
break
clean_up()
def start_service(s):
services_kwargs = {
"gunicorn": get_start_gunicorn_kwargs,
"celery_ansible": get_start_celery_ansible_kwargs,
"celery_default": get_start_celery_default_kwargs,
"beat": get_start_beat_kwargs,
"flower": get_start_flower_kwargs,
"daphne": get_start_daphne_kwargs,
}
kwargs = services_kwargs.get(s)()
pid_file = get_pid_file_path(s)
if os.path.isfile(pid_file):
os.unlink(pid_file)
cmd = kwargs.pop('cmd')
log_file_path = get_log_file_path(s)
log_file_f = open(log_file_path, 'a')
files_preserve.append(log_file_f)
kwargs['stderr'] = log_file_f
kwargs['stdout'] = log_file_f
p = subprocess.Popen(cmd, **kwargs)
with open(pid_file, 'w') as f:
f.write(str(p.pid))
return p
def start_services_and_watch(s):
logging.info(time.ctime())
logging.info('Jumpserver version {}, more see https://www.jumpserver.org'.format(
__version__)
)
services_set = parse_service(s)
for i in services_set:
if is_running(i):
show_service_status(i)
continue
p = start_service(i)
time.sleep(2)
processes[i] = p
if not DAEMON:
watch_services()
else:
show_service_status(s)
context = get_daemon_context()
with context:
watch_services()
def get_daemon_context():
daemon_pid_file = get_pid_file_path('jms')
daemon_log_f = open(get_log_file_path('jms'), 'a')
files_preserve.append(daemon_log_f)
context = daemon.DaemonContext(
pidfile=pidfile.TimeoutPIDLockFile(daemon_pid_file),
signal_map={
signal.SIGTERM: lambda x, y: clean_up(),
signal.SIGHUP: 'terminate',
},
stdout=daemon_log_f,
stderr=daemon_log_f,
files_preserve=files_preserve,
detach_process=True,
)
return context
def stop_service(srv, sig=15):
services_set = parse_service(srv)
for s in services_set:
if not is_running(s):
show_service_status(s)
continue
print("Stop service: {}".format(s), end='')
pid = get_pid(s)
os.kill(pid, sig)
with LOCK:
process = processes.pop(s, None)
if process is None:
try:
process = psutil.Process(pid)
except:
pass
if process is None:
print("\033[31m No process found\033[0m")
continue
try:
process.wait(1)
except:
pass
for i in range(STOP_TIMEOUT):
if i == STOP_TIMEOUT - 1:
print("\033[31m Error\033[0m")
if not is_running(s):
print("\033[32m Ok\033[0m")
break
else:
time.sleep(1)
continue
if srv == "all":
stop_daemon_service()
def stop_daemon_service():
pid = get_pid('jms')
if pid and check_pid(pid):
os.kill(pid, 15)
def stop_multi_services(services):
for s in services:
stop_service(s, sig=9)
def stop_service_force(s):
stop_service(s, sig=9)
def clean_up():
if not EXIT_EVENT.is_set():
EXIT_EVENT.set()
processes_dump = {k: v for k, v in processes.items()}
for s1, p1 in processes_dump.items():
stop_service(s1)
p1.wait()
def show_service_status(s):
services_set = parse_service(s)
for ns in services_set:
if is_running(ns):
pid = get_pid(ns)
print("{} is running: {}".format(ns, pid))
else:
print("{} is stopped".format(ns))
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="""
Jumpserver service control tools;
Example: \r\n
%(prog)s start all -d;
"""
)
parser.add_argument(
'action', type=str,
choices=("start", "stop", "restart", "status"),
help="Action to run"
)
parser.add_argument(
"service", type=str, default="all", nargs="?",
choices=("all", "web", "task", "gunicorn", "celery", "beat", "celery,beat", "flower", "ws"),
help="The service to start",
)
parser.add_argument('-d', '--daemon', nargs="?", const=1)
parser.add_argument('-w', '--worker', type=int, nargs="?", const=4)
parser.add_argument('-f', '--force', nargs="?", const=1)
args = parser.parse_args()
if args.daemon:
DAEMON = True
if args.worker:
WORKERS = args.worker
action = args.action
srv = args.service
if action == "start":
start_services_and_watch(srv)
os._exit(0)
elif action == "stop":
print("Stop service")
if args.force:
stop_service_force(srv)
else:
stop_service(srv)
elif action == "restart":
DAEMON = True
stop_service(srv)
time.sleep(5)
start_services_and_watch(srv)
else:
show_service_status(srv)