From 6ebe8e16bdffe0c472701c7093e79974e7121db7 Mon Sep 17 00:00:00 2001 From: ibuler Date: Mon, 21 Oct 2019 19:28:33 +0800 Subject: [PATCH] =?UTF-8?q?[Update]=20=E4=BF=AE=E6=94=B9=E8=AF=BB=E5=8F=96?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E6=97=B6=E5=8F=AF=E8=83=BD=E8=A7=A3=E7=A0=81?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/utils/encode.py | 4 ++++ apps/ops/ws.py | 20 ++++++++++++++------ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/apps/common/utils/encode.py b/apps/common/utils/encode.py index fc9467cba..d4e9c4cbe 100644 --- a/apps/common/utils/encode.py +++ b/apps/common/utils/encode.py @@ -182,3 +182,7 @@ def encrypt_password(password, salt=None): def get_signer(): signer = Signer(settings.SECRET_KEY) return signer + + +def ensure_last_char_is_ascii(data): + remain = '' diff --git a/apps/ops/ws.py b/apps/ops/ws.py index 1d2bea3c5..d6bff86a7 100644 --- a/apps/ops/ws.py +++ b/apps/ops/ws.py @@ -3,11 +3,13 @@ import os import threading import json -from celery.result import AsyncResult +from common.utils import get_logger from .celery.utils import get_celery_task_log_path from channels.generic.websocket import JsonWebsocketConsumer +logger = get_logger(__name__) + class CeleryLogWebsocket(JsonWebsocketConsumer): disconnected = False @@ -22,6 +24,7 @@ class CeleryLogWebsocket(JsonWebsocketConsumer): self.handle_task(task_id) def handle_task(self, task_id): + logger.info("Task id: {}".format(task_id)) log_path = get_celery_task_log_path(task_id) def func(): @@ -34,19 +37,24 @@ class CeleryLogWebsocket(JsonWebsocketConsumer): continue self.send_json({'message': '\r\n'}) try: - task_log_f = open(log_path) + logger.debug('Task log path: {}'.format(log_path)) + task_log_f = open(log_path, 'rb') break except OSError: return + if not task_log_f: + return + while not self.disconnected: data = task_log_f.readline() + if data: - data = data.replace('\n', '\r\n') - self.send_json({'message': data, 'task': task_id}) - if data.startswith('Task') and data.find('succeeded'): + data = data.replace(b'\n', b'\r\n') + self.send_json({'message': data.decode(errors='ignore'), 'task': task_id}) + if data.startswith(b'Task') and data.find(b'succeeded'): break - time.sleep(0.2) + time.sleep(0.1) task_log_f.close() thread = threading.Thread(target=func) thread.start()