fix(RAG): fix url document rag mode (#2874)

This commit is contained in:
alanchen 2025-08-05 21:52:39 +08:00 committed by GitHub
parent 26bb07f9d1
commit 4e7070b6ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 97 additions and 12 deletions

View File

@ -9,6 +9,7 @@ This example shows how to use AWEL to build a simple rag summary example.
```
export OPENAI_API_KEY={your_openai_key}
export OPENAI_API_BASE={your_openai_base}
export MODEL_NAME={LLM_MODEL_NAME}
```
or
```
@ -23,7 +24,7 @@ This example shows how to use AWEL to build a simple rag summary example.
curl -X POST http://127.0.0.1:5555/api/v1/awel/trigger/examples/rag/summary \
-H "Content-Type: application/json" -d '{
"url": "https://docs.dbgpt.site/docs/awel"
"url": "http://docs.dbgpt.cn/docs/awel/"
}'
"""
@ -58,13 +59,17 @@ with DAG("dbgpt_awel_simple_rag_summary_example") as dag:
"/examples/rag/summary", methods="POST", request_body=TriggerReqBody
)
request_handle_task = RequestHandleOperator()
path_operator = MapOperator(lambda request: request["url"])
path_operator = MapOperator(lambda request: {"source": request["url"]})
# build knowledge operator
knowledge_operator = KnowledgeOperator(knowledge_type=KnowledgeType.URL.name)
# build summary assembler operator
summary_operator = SummaryAssemblerOperator(
llm_client=OpenAILLMClient(api_key=os.getenv("OPENAI_API_KEY", "your api key")),
language="en",
llm_client=OpenAILLMClient(
api_key=os.getenv("OPENAI_API_KEY", "your api key"),
api_base=os.getenv("OPENAI_API_BASE", "your api base"),
),
language="zh",
model_name=os.getenv("MODEL_NAME", "your model name"),
)
(
trigger

View File

@ -37,16 +37,96 @@ class URLKnowledge(Knowledge):
"""Fetch URL document from loader."""
if self._loader:
documents = self._loader.load()
return [Document.langchain2doc(lc_document) for lc_document in documents]
else:
from langchain.document_loaders import WebBaseLoader # mypy: ignore
if self._path is not None:
web_reader = WebBaseLoader(web_path=self._path, encoding="utf8")
documents = web_reader.load()
else:
# Handle the case where self._path is None
if self._path is None:
raise ValueError("web_path cannot be None")
return [Document.langchain2doc(lc_document) for lc_document in documents]
return self._load_document_default()
def _load_document_default(self) -> List[Document]:
"""Fetch URL document with trafilatura."""
import re
import unicodedata
import requests
from bs4 import BeautifulSoup
def clean_text(text: str) -> str:
"""Clean text by removing special Unicode characters."""
if not text:
return ""
# Remove zero-width characters and other invisible Unicode characters
text = re.sub(r"[\u200b-\u200f\u2060\ufeff]", "", text)
# Remove control characters except newline, tab, and carriage return
text = re.sub(r"[\x00-\x08\x0b-\x0c\x0e-\x1f\x7f-\x9f]", "", text)
# Normalize Unicode characters
text = unicodedata.normalize("NFKC", text)
# Clean up extra whitespace
text = " ".join(text.split())
return text.strip()
try:
# Set user agent to avoid being blocked
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit"
"/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
# Fetch the webpage content
response = requests.get(self._path, headers=headers, timeout=30)
response.raise_for_status()
# Determine encoding
if self._encoding is not None:
response.encoding = self._encoding
elif response.encoding == "ISO-8859-1":
response.encoding = response.apparent_encoding
# Parse HTML content
soup = BeautifulSoup(response.text, "html.parser")
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text content
text_content = soup.get_text(strip=True)
# Clean up whitespace
lines = (line.strip() for line in text_content.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text_content = " ".join(chunk for chunk in chunks if chunk)
text_content = clean_text(text_content)
# Get page title if available
title = (
soup.title.string.strip() if soup.title and soup.title.string else ""
)
title = clean_text(title)
description = soup.find("meta", attrs={"name": "description"})
desc_content = description["content"] if description else ""
desc_content = clean_text(desc_content)
# Create metadata
metadata = {
"source": self._path,
"title": title,
"encoding": response.encoding,
"description": desc_content,
}
document = Document(content=text_content, metadata=metadata)
return [document]
except Exception as e:
raise ValueError(f"Failed to parse URL content: {str(e)}")
@classmethod
def support_chunk_strategy(cls) -> List[ChunkStrategy]: