diff --git a/libs/text-splitters/langchain_text_splitters/markdown.py b/libs/text-splitters/langchain_text_splitters/markdown.py index b717597d5ca..fdcd010f50d 100644 --- a/libs/text-splitters/langchain_text_splitters/markdown.py +++ b/libs/text-splitters/langchain_text_splitters/markdown.py @@ -1,6 +1,7 @@ from __future__ import annotations -from typing import Any, Dict, List, Tuple, TypedDict +import re +from typing import Any, Dict, List, Tuple, TypedDict, Union from langchain_core.documents import Document @@ -221,3 +222,161 @@ class HeaderType(TypedDict): level: int name: str data: str + + +class ExperimentalMarkdownSyntaxTextSplitter: + """ + An experimental text splitter for handling Markdown syntax. + + This splitter aims to retain the exact whitespace of the original text while + extracting structured metadata, such as headers. It is a re-implementation of the + MarkdownHeaderTextSplitter with notable changes to the approach and + additional features. + + Key Features: + - Retains the original whitespace and formatting of the Markdown text. + - Extracts headers, code blocks, and horizontal rules as metadata. + - Splits out code blocks and includes the language in the "Code" metadata key. + - Splits text on horizontal rules (`---`) as well. + - Defaults to sensible splitting behavior, which can be overridden using the + `headers_to_split_on` parameter. + + Parameters: + ---------- + headers_to_split_on : List[Tuple[str, str]], optional + Headers to split on, defaulting to common Markdown headers if not specified. + return_each_line : bool, optional + When set to True, returns each line as a separate chunk. Default is False. + + Usage example: + -------------- + >>> headers_to_split_on = [ + >>> ("#", "Header 1"), + >>> ("##", "Header 2"), + >>> ] + >>> splitter = ExperimentalMarkdownSyntaxTextSplitter( + >>> headers_to_split_on=headers_to_split_on + >>> ) + >>> chunks = splitter.split(text) + >>> for chunk in chunks: + >>> print(chunk) + + This class is currently experimental and subject to change based on feedback and + further development. + """ + + DEFAULT_HEADER_KEYS = { + "#": "Header 1", + "##": "Header 2", + "###": "Header 3", + "####": "Header 4", + "#####": "Header 5", + "######": "Header 6", + } + + def __init__( + self, + headers_to_split_on: Union[List[Tuple[str, str]], None] = None, + return_each_line: bool = False, + strip_headers: bool = True, + ): + self.chunks: List[Document] = [] + self.current_chunk = Document(page_content="") + self.current_header_stack: List[Tuple[int, str]] = [] + self.strip_headers = strip_headers + if headers_to_split_on: + self.splittable_headers = dict(headers_to_split_on) + else: + self.splittable_headers = self.DEFAULT_HEADER_KEYS + + self.return_each_line = return_each_line + + def split_text(self, text: str) -> List[Document]: + raw_lines = text.splitlines(keepends=True) + + while raw_lines: + raw_line = raw_lines.pop(0) + header_match = self._match_header(raw_line) + code_match = self._match_code(raw_line) + horz_match = self._match_horz(raw_line) + if header_match: + self._complete_chunk_doc() + + if not self.strip_headers: + self.current_chunk.page_content += raw_line + + # add the header to the stack + header_depth = len(header_match.group(1)) + header_text = header_match.group(2) + self._resolve_header_stack(header_depth, header_text) + elif code_match: + self._complete_chunk_doc() + self.current_chunk.page_content = self._resolve_code_chunk( + raw_line, raw_lines + ) + self.current_chunk.metadata["Code"] = code_match.group(1) + self._complete_chunk_doc() + elif horz_match: + self._complete_chunk_doc() + else: + self.current_chunk.page_content += raw_line + + self._complete_chunk_doc() + # I don't see why `return_each_line` is a necessary feature of this splitter. + # It's easy enough to to do outside of the class and the caller can have more + # control over it. + if self.return_each_line: + return [ + Document(page_content=line, metadata=chunk.metadata) + for chunk in self.chunks + for line in chunk.page_content.splitlines() + if line and not line.isspace() + ] + return self.chunks + + def _resolve_header_stack(self, header_depth: int, header_text: str) -> None: + for i, (depth, _) in enumerate(self.current_header_stack): + if depth == header_depth: + self.current_header_stack[i] = (header_depth, header_text) + self.current_header_stack = self.current_header_stack[: i + 1] + return + self.current_header_stack.append((header_depth, header_text)) + + def _resolve_code_chunk(self, current_line: str, raw_lines: List[str]) -> str: + chunk = current_line + while raw_lines: + raw_line = raw_lines.pop(0) + chunk += raw_line + if self._match_code(raw_line): + return chunk + return "" + + def _complete_chunk_doc(self) -> None: + chunk_content = self.current_chunk.page_content + # Discard any empty documents + if chunk_content and not chunk_content.isspace(): + # Apply the header stack as metadata + for depth, value in self.current_header_stack: + header_key = self.splittable_headers.get("#" * depth) + self.current_chunk.metadata[header_key] = value + self.chunks.append(self.current_chunk) + # Reset the current chunk + self.current_chunk = Document(page_content="") + + # Match methods + def _match_header(self, line: str) -> Union[re.Match, None]: + match = re.match(r"^(#{1,6}) (.*)", line) + # Only matches on the configured headers + if match and match.group(1) in self.splittable_headers: + return match + return None + + def _match_code(self, line: str) -> Union[re.Match, None]: + matches = [re.match(rule, line) for rule in [r"^```(.*)", r"^~~~(.*)"]] + return next((match for match in matches if match), None) + + def _match_horz(self, line: str) -> Union[re.Match, None]: + matches = [ + re.match(rule, line) for rule in [r"^\*\*\*+\n", r"^---+\n", r"^___+\n"] + ] + return next((match for match in matches if match), None) diff --git a/libs/text-splitters/tests/unit_tests/test_text_splitters.py b/libs/text-splitters/tests/unit_tests/test_text_splitters.py index d3ca82862fa..d1d113009d4 100644 --- a/libs/text-splitters/tests/unit_tests/test_text_splitters.py +++ b/libs/text-splitters/tests/unit_tests/test_text_splitters.py @@ -19,7 +19,10 @@ from langchain_text_splitters.base import split_text_on_tokens from langchain_text_splitters.character import CharacterTextSplitter from langchain_text_splitters.html import HTMLHeaderTextSplitter, HTMLSectionSplitter from langchain_text_splitters.json import RecursiveJsonSplitter -from langchain_text_splitters.markdown import MarkdownHeaderTextSplitter +from langchain_text_splitters.markdown import ( + ExperimentalMarkdownSyntaxTextSplitter, + MarkdownHeaderTextSplitter, +) from langchain_text_splitters.python import PythonCodeTextSplitter FAKE_PYTHON_TEXT = """ @@ -1296,6 +1299,210 @@ def test_md_header_text_splitter_with_invisible_characters(characters: str) -> N assert output == expected_output +EXPERIMENTAL_MARKDOWN_DOCUMENT = ( + "# My Header 1\n" + "Content for header 1\n" + "## Header 2\n" + "Content for header 2\n" + "```python\n" + "def func_definition():\n" + " print('Keep the whitespace consistent')\n" + "```\n" + "# Header 1 again\n" + "We should also split on the horizontal line\n" + "----\n" + "This will be a new doc but with the same header metadata\n\n" + "And it includes a new paragraph" +) + + +def test_experimental_markdown_syntax_text_splitter() -> None: + """Test experimental markdown syntax splitter.""" + + markdown_splitter = ExperimentalMarkdownSyntaxTextSplitter() + output = markdown_splitter.split_text(EXPERIMENTAL_MARKDOWN_DOCUMENT) + + expected_output = [ + Document( + page_content="Content for header 1\n", + metadata={"Header 1": "My Header 1"}, + ), + Document( + page_content="Content for header 2\n", + metadata={"Header 1": "My Header 1", "Header 2": "Header 2"}, + ), + Document( + page_content=( + "```python\ndef func_definition():\n " + "print('Keep the whitespace consistent')\n```\n" + ), + metadata={ + "Code": "python", + "Header 1": "My Header 1", + "Header 2": "Header 2", + }, + ), + Document( + page_content="We should also split on the horizontal line\n", + metadata={"Header 1": "Header 1 again"}, + ), + Document( + page_content=( + "This will be a new doc but with the same header metadata\n\n" + "And it includes a new paragraph" + ), + metadata={"Header 1": "Header 1 again"}, + ), + ] + + assert output == expected_output + + +def test_experimental_markdown_syntax_text_splitter_header_configuration() -> None: + """Test experimental markdown syntax splitter.""" + + headers_to_split_on = [("#", "Encabezamiento 1")] + + markdown_splitter = ExperimentalMarkdownSyntaxTextSplitter( + headers_to_split_on=headers_to_split_on + ) + output = markdown_splitter.split_text(EXPERIMENTAL_MARKDOWN_DOCUMENT) + + expected_output = [ + Document( + page_content="Content for header 1\n## Header 2\nContent for header 2\n", + metadata={"Encabezamiento 1": "My Header 1"}, + ), + Document( + page_content=( + "```python\ndef func_definition():\n " + "print('Keep the whitespace consistent')\n```\n" + ), + metadata={"Code": "python", "Encabezamiento 1": "My Header 1"}, + ), + Document( + page_content="We should also split on the horizontal line\n", + metadata={"Encabezamiento 1": "Header 1 again"}, + ), + Document( + page_content=( + "This will be a new doc but with the same header metadata\n\n" + "And it includes a new paragraph" + ), + metadata={"Encabezamiento 1": "Header 1 again"}, + ), + ] + + assert output == expected_output + + +def test_experimental_markdown_syntax_text_splitter_with_headers() -> None: + """Test experimental markdown syntax splitter.""" + + markdown_splitter = ExperimentalMarkdownSyntaxTextSplitter(strip_headers=False) + output = markdown_splitter.split_text(EXPERIMENTAL_MARKDOWN_DOCUMENT) + + expected_output = [ + Document( + page_content="# My Header 1\nContent for header 1\n", + metadata={"Header 1": "My Header 1"}, + ), + Document( + page_content="## Header 2\nContent for header 2\n", + metadata={"Header 1": "My Header 1", "Header 2": "Header 2"}, + ), + Document( + page_content=( + "```python\ndef func_definition():\n " + "print('Keep the whitespace consistent')\n```\n" + ), + metadata={ + "Code": "python", + "Header 1": "My Header 1", + "Header 2": "Header 2", + }, + ), + Document( + page_content=( + "# Header 1 again\nWe should also split on the horizontal line\n" + ), + metadata={"Header 1": "Header 1 again"}, + ), + Document( + page_content=( + "This will be a new doc but with the same header metadata\n\n" + "And it includes a new paragraph" + ), + metadata={"Header 1": "Header 1 again"}, + ), + ] + + assert output == expected_output + + +def test_experimental_markdown_syntax_text_splitter_split_lines() -> None: + """Test experimental markdown syntax splitter.""" + + markdown_splitter = ExperimentalMarkdownSyntaxTextSplitter(return_each_line=True) + output = markdown_splitter.split_text(EXPERIMENTAL_MARKDOWN_DOCUMENT) + + expected_output = [ + Document( + page_content="Content for header 1", metadata={"Header 1": "My Header 1"} + ), + Document( + page_content="Content for header 2", + metadata={"Header 1": "My Header 1", "Header 2": "Header 2"}, + ), + Document( + page_content="```python", + metadata={ + "Code": "python", + "Header 1": "My Header 1", + "Header 2": "Header 2", + }, + ), + Document( + page_content="def func_definition():", + metadata={ + "Code": "python", + "Header 1": "My Header 1", + "Header 2": "Header 2", + }, + ), + Document( + page_content=" print('Keep the whitespace consistent')", + metadata={ + "Code": "python", + "Header 1": "My Header 1", + "Header 2": "Header 2", + }, + ), + Document( + page_content="```", + metadata={ + "Code": "python", + "Header 1": "My Header 1", + "Header 2": "Header 2", + }, + ), + Document( + page_content="We should also split on the horizontal line", + metadata={"Header 1": "Header 1 again"}, + ), + Document( + page_content="This will be a new doc but with the same header metadata", + metadata={"Header 1": "Header 1 again"}, + ), + Document( + page_content="And it includes a new paragraph", + metadata={"Header 1": "Header 1 again"}, + ), + ] + + assert output == expected_output + + def test_solidity_code_splitter() -> None: splitter = RecursiveCharacterTextSplitter.from_language( Language.SOL, chunk_size=CHUNK_SIZE, chunk_overlap=0