fix: safe_run_cmd has the risk of command injection when using shell=True

This commit is contained in:
wangruidong
2026-02-25 16:10:35 +08:00
committed by 老广
parent b71519cc44
commit d9be890e89
2 changed files with 41 additions and 16 deletions

View File

@@ -14,7 +14,7 @@ from django.utils.translation import gettext_lazy as _
from common.const.crontab import CRONTAB_AT_AM_TWO
from common.storage.ftp_file import FTPFileStorageHandler
from common.utils import get_log_keep_day, get_logger
from common.utils.safe import safe_run_cmd
from common.utils.safe import find_and_delete_empty_dirs, find_and_delete_files, truncate_file
from ops.celery.decorator import register_as_period_task
from ops.models import CeleryTaskExecution
from orgs.utils import tmp_to_root_org
@@ -60,10 +60,8 @@ def clean_ftp_log_period():
expired_day = now - datetime.timedelta(days=days)
file_store_dir = safe_join(default_storage.base_location, FTPLog.upload_to)
FTPLog.objects.filter(date_start__lt=expired_day).delete()
command = "find %s -mtime +%s -type f -exec rm -f {} \\;"
safe_run_cmd(command, (file_store_dir, days))
command = "find %s -type d -empty -delete;"
safe_run_cmd(command, (file_store_dir,))
find_and_delete_files(file_store_dir, mtime_days=days)
find_and_delete_empty_dirs(file_store_dir)
logger.info("Clean FTP file done")
@@ -75,11 +73,9 @@ def clean_celery_tasks_period():
tasks.delete()
tasks = CeleryTaskExecution.objects.filter(date_start__isnull=True)
tasks.delete()
command = "find %s -mtime +%s -name '*.log' -type f -exec rm -f {} \\;"
safe_run_cmd(command, (settings.CELERY_LOG_DIR, expire_days))
find_and_delete_files(settings.CELERY_LOG_DIR, name_pattern="*.log", mtime_days=expire_days)
celery_log_path = safe_join(settings.LOG_DIR, 'celery.log')
command = "echo > %s"
safe_run_cmd(command, (celery_log_path,))
truncate_file(celery_log_path)
def batch_delete(queryset, batch_size=3000):
@@ -124,8 +120,7 @@ def clean_expired_session_period():
batch_delete(expired_commands)
logger.info("Clean session command done")
remove_files_by_days(replay_dir, days)
command = "find %s -type d -empty -delete;"
safe_run_cmd(command, (replay_dir,))
find_and_delete_empty_dirs(replay_dir)
logger.info("Clean session replay done")

View File

@@ -1,8 +1,38 @@
import shlex
import logging
import subprocess
logger = logging.getLogger(__name__)
def safe_run_cmd(cmd_str, cmd_args=(), shell=True):
cmd_args = [shlex.quote(str(arg)) for arg in cmd_args]
cmd = cmd_str % tuple(cmd_args)
return subprocess.run(cmd, shell=shell)
def safe_run_cmd(cmd_args, shell=False):
if shell:
raise ValueError("shell=True is not allowed in safe_run_cmd. " "Pass command as a list with shell=False.")
cmd_args = [str(arg) for arg in cmd_args]
try:
return subprocess.run(cmd_args, shell=False)
except Exception as e:
logger.error("Failed to run command %s: %s", cmd_args, e)
return None
def truncate_file(file_path):
try:
with open(file_path, "w"):
pass
except Exception as e:
logger.error("Failed to truncate file %s: %s", file_path, e)
def find_and_delete_files(directory, name_pattern=None, mtime_days=None):
cmd = ["find", str(directory), "-type", "f"]
if mtime_days is not None:
cmd.extend(["-mtime", "+%s" % int(mtime_days)])
if name_pattern is not None:
cmd.extend(["-name", str(name_pattern)])
cmd.append("-delete")
return safe_run_cmd(cmd)
def find_and_delete_empty_dirs(directory):
cmd = ["find", str(directory), "-type", "d", "-empty", "-delete"]
return safe_run_cmd(cmd)