"""HTML text splitters."""
from __future__ import annotations
import copy
import pathlib
import re
from io import StringIO
from typing import (
IO,
TYPE_CHECKING,
Any,
Callable,
Literal,
Optional,
TypedDict,
Union,
cast,
)
import requests
from langchain_core._api import beta
from langchain_core.documents import BaseDocumentTransformer, Document
from typing_extensions import override
from langchain_text_splitters.character import RecursiveCharacterTextSplitter
if TYPE_CHECKING:
from collections.abc import Iterable, Iterator, Sequence
from bs4.element import ResultSet
try:
import nltk
_HAS_NLTK = True
except ImportError:
_HAS_NLTK = False
try:
from bs4 import BeautifulSoup, Tag
from bs4.element import NavigableString, PageElement
_HAS_BS4 = True
except ImportError:
_HAS_BS4 = False
try:
from lxml import etree
_HAS_LXML = True
except ImportError:
_HAS_LXML = False
class ElementType(TypedDict):
"""Element type as typed dict."""
url: str
xpath: str
content: str
metadata: dict[str, str]
# Unfortunately, BeautifulSoup doesn't define overloads for Tag.find_all.
# So doing the type resolution ourselves.
def _find_all_strings(
tag: Tag,
*,
recursive: bool = True,
) -> ResultSet[NavigableString]:
return cast(
"ResultSet[NavigableString]", tag.find_all(string=True, recursive=recursive)
)
def _find_all_tags(
tag: Tag,
*,
name: Union[bool, str, list[str], None] = None,
recursive: bool = True,
) -> ResultSet[Tag]:
return cast("ResultSet[Tag]", tag.find_all(name, recursive=recursive))
class HTMLHeaderTextSplitter:
"""Split HTML content into structured Documents based on specified headers.
Splits HTML content by detecting specified header tags (e.g.,
, ) and
creating hierarchical Document objects that reflect the semantic structure
of the original content. For each identified section, the splitter associates
the extracted text with metadata corresponding to the encountered headers.
If no specified headers are found, the entire content is returned as a single
Document. This allows for flexible handling of HTML input, ensuring that
information is organized according to its semantic headers.
The splitter provides the option to return each HTML element as a separate
Document or aggregate them into semantically meaningful chunks. It also
gracefully handles multiple levels of nested headers, creating a rich,
hierarchical representation of the content.
Example:
.. code-block:: python
from langchain_text_splitters.html_header_text_splitter import (
HTMLHeaderTextSplitter,
)
# Define headers for splitting on h1 and h2 tags.
headers_to_split_on = [("h1", "Main Topic"), ("h2", "Sub Topic")]
splitter = HTMLHeaderTextSplitter(
headers_to_split_on=headers_to_split_on,
return_each_element=False
)
html_content = \"\"\"
Introduction
Welcome to the introduction section.
Background
Some background details here.
Conclusion
Final thoughts.
\"\"\"
documents = splitter.split_text(html_content)
# 'documents' now contains Document objects reflecting the hierarchy:
# - Document with metadata={"Main Topic": "Introduction"} and
# content="Introduction"
# - Document with metadata={"Main Topic": "Introduction"} and
# content="Welcome to the introduction section."
# - Document with metadata={"Main Topic": "Introduction",
# "Sub Topic": "Background"} and content="Background"
# - Document with metadata={"Main Topic": "Introduction",
# "Sub Topic": "Background"} and content="Some background details here."
# - Document with metadata={"Main Topic": "Conclusion"} and
# content="Conclusion"
# - Document with metadata={"Main Topic": "Conclusion"} and
# content="Final thoughts."
"""
def __init__(
self,
headers_to_split_on: list[tuple[str, str]],
return_each_element: bool = False, # noqa: FBT001,FBT002
) -> None:
"""Initialize with headers to split on.
Args:
headers_to_split_on: A list of (header_tag,
header_name) pairs representing the headers that define splitting
boundaries. For example, [("h1", "Header 1"), ("h2", "Header 2")]
will split content by and tags, assigning their textual
content to the Document metadata.
return_each_element: If True, every HTML element encountered
(including headers, paragraphs, etc.) is returned as a separate
Document. If False, content under the same header hierarchy is
aggregated into fewer Documents.
"""
# Sort headers by their numeric level so that h1 < h2 < h3...
self.headers_to_split_on = sorted(
headers_to_split_on, key=lambda x: int(x[0][1:])
)
self.header_mapping = dict(self.headers_to_split_on)
self.header_tags = [tag for tag, _ in self.headers_to_split_on]
self.return_each_element = return_each_element
def split_text(self, text: str) -> list[Document]:
"""Split the given text into a list of Document objects.
Args:
text: The HTML text to split.
Returns:
A list of split Document objects. Each Document contains
`page_content` holding the extracted text and `metadata` that maps
the header hierarchy to their corresponding titles.
"""
return self.split_text_from_file(StringIO(text))
def split_text_from_url(
self, url: str, timeout: int = 10, **kwargs: Any
) -> list[Document]:
"""Fetch text content from a URL and split it into documents.
Args:
url: The URL to fetch content from.
timeout: Timeout for the request. Defaults to 10.
**kwargs: Additional keyword arguments for the request.
Returns:
A list of split Document objects. Each Document contains
`page_content` holding the extracted text and `metadata` that maps
the header hierarchy to their corresponding titles.
Raises:
requests.RequestException: If the HTTP request fails.
"""
response = requests.get(url, timeout=timeout, **kwargs)
response.raise_for_status()
return self.split_text(response.text)
def split_text_from_file(self, file: Union[str, IO[str]]) -> list[Document]:
"""Split HTML content from a file into a list of Document objects.
Args:
file: A file path or a file-like object containing HTML content.
Returns:
A list of split Document objects. Each Document contains
`page_content` holding the extracted text and `metadata` that maps
the header hierarchy to their corresponding titles.
"""
if isinstance(file, str):
html_content = pathlib.Path(file).read_text(encoding="utf-8")
else:
html_content = file.read()
return list(self._generate_documents(html_content))
def _generate_documents(self, html_content: str) -> Iterator[Document]:
"""Private method that performs a DFS traversal over the DOM and yields.
Document objects on-the-fly. This approach maintains the same splitting
logic (headers vs. non-headers, chunking, etc.) while walking the DOM
explicitly in code.
Args:
html_content: The raw HTML content.
Yields:
Document objects as they are created.
"""
if not _HAS_BS4:
msg = (
"Unable to import BeautifulSoup. Please install via `pip install bs4`."
)
raise ImportError(msg)
soup = BeautifulSoup(html_content, "html.parser")
body = soup.body or soup
# Dictionary of active headers:
# key = user-defined header name (e.g. "Header 1")
# value = tuple of header_text, level, dom_depth
active_headers: dict[str, tuple[str, int, int]] = {}
current_chunk: list[str] = []
def finalize_chunk() -> Optional[Document]:
"""Finalize the accumulated chunk into a single Document."""
if not current_chunk:
return None
final_text = " \n".join(line for line in current_chunk if line.strip())
current_chunk.clear()
if not final_text.strip():
return None
final_meta = {k: v[0] for k, v in active_headers.items()}
return Document(page_content=final_text, metadata=final_meta)
# We'll use a stack for DFS traversal
stack = [body]
while stack:
node = stack.pop()
children = list(node.children)
stack.extend(
child for child in reversed(children) if isinstance(child, Tag)
)
tag = getattr(node, "name", None)
if not tag:
continue
text_elements = [
str(child).strip() for child in _find_all_strings(node, recursive=False)
]
node_text = " ".join(elem for elem in text_elements if elem)
if not node_text:
continue
dom_depth = len(list(node.parents))
# If this node is one of our headers
if tag in self.header_tags:
# If we're aggregating, finalize whatever chunk we had
if not self.return_each_element:
doc = finalize_chunk()
if doc:
yield doc
# Determine numeric level (h1->1, h2->2, etc.)
try:
level = int(tag[1:])
except ValueError:
level = 9999
# Remove any active headers that are at or deeper than this new level
headers_to_remove = [
k for k, (_, lvl, d) in active_headers.items() if lvl >= level
]
for key in headers_to_remove:
del active_headers[key]
# Add/Update the active header
header_name = self.header_mapping[tag]
active_headers[header_name] = (node_text, level, dom_depth)
# Always yield a Document for the header
header_meta = {k: v[0] for k, v in active_headers.items()}
yield Document(page_content=node_text, metadata=header_meta)
else:
headers_out_of_scope = [
k for k, (_, _, d) in active_headers.items() if dom_depth < d
]
for key in headers_out_of_scope:
del active_headers[key]
if self.return_each_element:
# Yield each element's text as its own Document
meta = {k: v[0] for k, v in active_headers.items()}
yield Document(page_content=node_text, metadata=meta)
else:
# Accumulate text in our chunk
current_chunk.append(node_text)
# If we're aggregating and have leftover chunk, yield it
if not self.return_each_element:
doc = finalize_chunk()
if doc:
yield doc
class HTMLSectionSplitter:
"""Splitting HTML files based on specified tag and font sizes.
Requires lxml package.
"""
def __init__(
self,
headers_to_split_on: list[tuple[str, str]],
**kwargs: Any,
) -> None:
"""Create a new HTMLSectionSplitter.
Args:
headers_to_split_on: list of tuples of headers we want to track mapped to
(arbitrary) keys for metadata. Allowed header values: h1, h2, h3, h4,
h5, h6 e.g. [("h1", "Header 1"), ("h2", "Header 2"].
**kwargs (Any): Additional optional arguments for customizations.
"""
self.headers_to_split_on = dict(headers_to_split_on)
self.xslt_path = (
pathlib.Path(__file__).parent / "xsl/converting_to_header.xslt"
).absolute()
self.kwargs = kwargs
def split_documents(self, documents: Iterable[Document]) -> list[Document]:
"""Split documents."""
texts, metadatas = [], []
for doc in documents:
texts.append(doc.page_content)
metadatas.append(doc.metadata)
results = self.create_documents(texts, metadatas=metadatas)
text_splitter = RecursiveCharacterTextSplitter(**self.kwargs)
return text_splitter.split_documents(results)
def split_text(self, text: str) -> list[Document]:
"""Split HTML text string.
Args:
text: HTML text
"""
return self.split_text_from_file(StringIO(text))
def create_documents(
self, texts: list[str], metadatas: Optional[list[dict[Any, Any]]] = None
) -> list[Document]:
"""Create documents from a list of texts."""
metadatas_ = metadatas or [{}] * len(texts)
documents = []
for i, text in enumerate(texts):
for chunk in self.split_text(text):
metadata = copy.deepcopy(metadatas_[i])
for key in chunk.metadata:
if chunk.metadata[key] == "#TITLE#":
chunk.metadata[key] = metadata["Title"]
metadata = {**metadata, **chunk.metadata}
new_doc = Document(page_content=chunk.page_content, metadata=metadata)
documents.append(new_doc)
return documents
def split_html_by_headers(self, html_doc: str) -> list[dict[str, Optional[str]]]:
"""Split an HTML document into sections based on specified header tags.
This method uses BeautifulSoup to parse the HTML content and divides it into
sections based on headers defined in `headers_to_split_on`. Each section
contains the header text, content under the header, and the tag name.
Args:
html_doc (str): The HTML document to be split into sections.
Returns:
List[Dict[str, Optional[str]]]: A list of dictionaries representing
sections.
Each dictionary contains:
* 'header': The header text or a default title for the first section.
* 'content': The content under the header.
* 'tag_name': The name of the header tag (e.g., "h1", "h2").
"""
if not _HAS_BS4:
msg = "Unable to import BeautifulSoup/PageElement, \
please install with `pip install \
bs4`."
raise ImportError(msg)
soup = BeautifulSoup(html_doc, "html.parser")
header_names = list(self.headers_to_split_on.keys())
sections: list[dict[str, str | None]] = []
headers = _find_all_tags(soup, name=["body", *header_names])
for i, header in enumerate(headers):
if i == 0:
current_header = "#TITLE#"
current_header_tag = "h1"
section_content: list[str] = []
else:
current_header = header.text.strip()
current_header_tag = header.name
section_content = []
for element in header.next_elements:
if i + 1 < len(headers) and element == headers[i + 1]:
break
if isinstance(element, str):
section_content.append(element)
content = " ".join(section_content).strip()
if content:
sections.append(
{
"header": current_header,
"content": content,
"tag_name": current_header_tag,
}
)
return sections
def convert_possible_tags_to_header(self, html_content: str) -> str:
"""Convert specific HTML tags to headers using an XSLT transformation.
This method uses an XSLT file to transform the HTML content, converting
certain tags into headers for easier parsing. If no XSLT path is provided,
the HTML content is returned unchanged.
Args:
html_content (str): The HTML content to be transformed.
Returns:
str: The transformed HTML content as a string.
"""
if not _HAS_LXML:
msg = "Unable to import lxml, please install with `pip install lxml`."
raise ImportError(msg)
# use lxml library to parse html document and return xml ElementTree
# Create secure parsers to prevent XXE attacks
html_parser = etree.HTMLParser(no_network=True)
xslt_parser = etree.XMLParser(
resolve_entities=False, no_network=True, load_dtd=False
)
# Apply XSLT access control to prevent file/network access
# DENY_ALL is a predefined access control that blocks all file/network access
# Type ignore needed due to incomplete lxml type stubs
ac = etree.XSLTAccessControl.DENY_ALL # type: ignore[attr-defined]
tree = etree.parse(StringIO(html_content), html_parser)
xslt_tree = etree.parse(self.xslt_path, xslt_parser)
transform = etree.XSLT(xslt_tree, access_control=ac)
result = transform(tree)
return str(result)
def split_text_from_file(self, file: StringIO) -> list[Document]:
"""Split HTML content from a file into a list of Document objects.
Args:
file: A file path or a file-like object containing HTML content.
Returns:
A list of split Document objects.
"""
file_content = file.getvalue()
file_content = self.convert_possible_tags_to_header(file_content)
sections = self.split_html_by_headers(file_content)
return [
Document(
cast("str", section["content"]),
metadata={
self.headers_to_split_on[str(section["tag_name"])]: section[
"header"
]
},
)
for section in sections
]
@beta()
class HTMLSemanticPreservingSplitter(BaseDocumentTransformer):
"""Split HTML content preserving semantic structure.
Splits HTML content by headers into generalized chunks, preserving semantic
structure. If chunks exceed the maximum chunk size, it uses
RecursiveCharacterTextSplitter for further splitting.
The splitter preserves full HTML elements (e.g.,
, ) and converts
links to Markdown-like links. It can also preserve images, videos, and audio
elements by converting them into Markdown format. Note that some chunks may
exceed the maximum size to maintain semantic integrity.
.. versionadded: 0.3.5
Example:
.. code-block:: python
from langchain_text_splitters.html import HTMLSemanticPreservingSplitter
def custom_iframe_extractor(iframe_tag):
```
Custom handler function to extract the 'src' attribute from an