From 80b03e73f66d63a70a04a5c9d2f5e402d5d60efb Mon Sep 17 00:00:00 2001 From: ibuler Date: Wed, 9 Dec 2020 16:27:04 +0800 Subject: [PATCH] =?UTF-8?q?feat(celery):=20=E6=B7=BB=E5=8A=A0celery?= =?UTF-8?q?=E7=9A=84health=20check=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/ops/celery/utils.py | 26 ++++++++++++++++++++ apps/ops/management/commands/check_celery.py | 20 +++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 apps/ops/management/commands/check_celery.py diff --git a/apps/ops/celery/utils.py b/apps/ops/celery/utils.py index 0c758b70e..ff5aeb1d4 100644 --- a/apps/ops/celery/utils.py +++ b/apps/ops/celery/utils.py @@ -3,6 +3,8 @@ import json import os +import redis_lock +import redis from django.conf import settings from django.utils.timezone import get_current_timezone from django.db.utils import ProgrammingError, OperationalError @@ -105,3 +107,27 @@ def get_celery_task_log_path(task_id): path = os.path.join(settings.CELERY_LOG_DIR, rel_path) os.makedirs(os.path.dirname(path), exist_ok=True) return path + + +def get_celery_status(): + from . import app + i = app.control.inspect() + ping_data = i.ping() or {} + active_nodes = [k for k, v in ping_data.items() if v.get('ok') == 'pong'] + active_queue_worker = set([n.split('@')[0] for n in active_nodes if n]) + if len(active_queue_worker) < 5: + print("Not all celery worker worked") + return False + else: + return True + + +def get_beat_status(): + CONFIG = settings.CONFIG + r = redis.Redis(host=CONFIG.REDIS_HOST, port=CONFIG.REDIS_PORT, password=CONFIG.REDIS_PASSWORD) + lock = redis_lock.Lock(r, name="beat-distribute-start-lock") + try: + locked = lock.locked() + return locked + except redis.ConnectionError: + return False diff --git a/apps/ops/management/commands/check_celery.py b/apps/ops/management/commands/check_celery.py new file mode 100644 index 000000000..41985129d --- /dev/null +++ b/apps/ops/management/commands/check_celery.py @@ -0,0 +1,20 @@ +from django.core.management.base import BaseCommand, CommandError + + +class Command(BaseCommand): + help = 'Ops manage commands' + + def add_arguments(self, parser): + parser.add_argument('check_celery', nargs='?', help='Check celery health') + + def handle(self, *args, **options): + from ops.celery.utils import get_celery_status, get_beat_status + + ok = get_celery_status() + if not ok: + raise CommandError('Celery worker unhealthy') + + ok = get_beat_status() + if not ok: + raise CommandError('Beat unhealthy') +