mirror of
				https://github.com/hwchase17/langchain.git
				synced 2025-10-29 23:00:18 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			471 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			471 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Markdown text splitters."""
 | |
| 
 | |
| from __future__ import annotations
 | |
| 
 | |
| import re
 | |
| from typing import Any, Optional, TypedDict, Union
 | |
| 
 | |
| from langchain_core.documents import Document
 | |
| 
 | |
| from langchain_text_splitters.base import Language
 | |
| from langchain_text_splitters.character import RecursiveCharacterTextSplitter
 | |
| 
 | |
| 
 | |
| class MarkdownTextSplitter(RecursiveCharacterTextSplitter):
 | |
|     """Attempts to split the text along Markdown-formatted headings."""
 | |
| 
 | |
|     def __init__(self, **kwargs: Any) -> None:
 | |
|         """Initialize a MarkdownTextSplitter."""
 | |
|         separators = self.get_separators_for_language(Language.MARKDOWN)
 | |
|         super().__init__(separators=separators, **kwargs)
 | |
| 
 | |
| 
 | |
| class MarkdownHeaderTextSplitter:
 | |
|     """Splitting markdown files based on specified headers."""
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         headers_to_split_on: list[tuple[str, str]],
 | |
|         return_each_line: bool = False,  # noqa: FBT001,FBT002
 | |
|         strip_headers: bool = True,  # noqa: FBT001,FBT002
 | |
|         custom_header_patterns: Optional[dict[str, int]] = None,
 | |
|     ) -> None:
 | |
|         """Create a new MarkdownHeaderTextSplitter.
 | |
| 
 | |
|         Args:
 | |
|             headers_to_split_on: Headers we want to track
 | |
|             return_each_line: Return each line w/ associated headers
 | |
|             strip_headers: Strip split headers from the content of the chunk
 | |
|             custom_header_patterns: Optional dict mapping header patterns to their
 | |
|                 levels. For example: {"**": 1, "***": 2} to treat **Header** as
 | |
|                 level 1 and ***Header*** as level 2 headers.
 | |
|         """
 | |
|         # Output line-by-line or aggregated into chunks w/ common headers
 | |
|         self.return_each_line = return_each_line
 | |
|         # Given the headers we want to split on,
 | |
|         # (e.g., "#, ##, etc") order by length
 | |
|         self.headers_to_split_on = sorted(
 | |
|             headers_to_split_on, key=lambda split: len(split[0]), reverse=True
 | |
|         )
 | |
|         # Strip headers split headers from the content of the chunk
 | |
|         self.strip_headers = strip_headers
 | |
|         # Custom header patterns with their levels
 | |
|         self.custom_header_patterns = custom_header_patterns or {}
 | |
| 
 | |
|     def _is_custom_header(self, line: str, sep: str) -> bool:
 | |
|         """Check if line matches a custom header pattern.
 | |
| 
 | |
|         Args:
 | |
|             line: The line to check
 | |
|             sep: The separator pattern to match
 | |
| 
 | |
|         Returns:
 | |
|             True if the line matches the custom pattern format
 | |
|         """
 | |
|         if sep not in self.custom_header_patterns:
 | |
|             return False
 | |
| 
 | |
|         # Escape special regex characters in the separator
 | |
|         escaped_sep = re.escape(sep)
 | |
|         # Create regex pattern to match exactly one separator at start and end
 | |
|         # with content in between
 | |
|         pattern = (
 | |
|             f"^{escaped_sep}(?!{escaped_sep})(.+?)(?<!{escaped_sep}){escaped_sep}$"
 | |
|         )
 | |
| 
 | |
|         match = re.match(pattern, line)
 | |
|         if match:
 | |
|             # Extract the content between the patterns
 | |
|             content = match.group(1).strip()
 | |
|             # Valid header if there's actual content (not just whitespace or separators)
 | |
|             # Check that content doesn't consist only of separator characters
 | |
|             if content and not all(c in sep for c in content.replace(" ", "")):
 | |
|                 return True
 | |
|         return False
 | |
| 
 | |
|     def aggregate_lines_to_chunks(self, lines: list[LineType]) -> list[Document]:
 | |
|         """Combine lines with common metadata into chunks.
 | |
| 
 | |
|         Args:
 | |
|             lines: Line of text / associated header metadata
 | |
|         """
 | |
|         aggregated_chunks: list[LineType] = []
 | |
| 
 | |
|         for line in lines:
 | |
|             if (
 | |
|                 aggregated_chunks
 | |
|                 and aggregated_chunks[-1]["metadata"] == line["metadata"]
 | |
|             ):
 | |
|                 # If the last line in the aggregated list
 | |
|                 # has the same metadata as the current line,
 | |
|                 # append the current content to the last lines's content
 | |
|                 aggregated_chunks[-1]["content"] += "  \n" + line["content"]
 | |
|             elif (
 | |
|                 aggregated_chunks
 | |
|                 and aggregated_chunks[-1]["metadata"] != line["metadata"]
 | |
|                 # may be issues if other metadata is present
 | |
|                 and len(aggregated_chunks[-1]["metadata"]) < len(line["metadata"])
 | |
|                 and aggregated_chunks[-1]["content"].split("\n")[-1][0] == "#"
 | |
|                 and not self.strip_headers
 | |
|             ):
 | |
|                 # If the last line in the aggregated list
 | |
|                 # has different metadata as the current line,
 | |
|                 # and has shallower header level than the current line,
 | |
|                 # and the last line is a header,
 | |
|                 # and we are not stripping headers,
 | |
|                 # append the current content to the last line's content
 | |
|                 aggregated_chunks[-1]["content"] += "  \n" + line["content"]
 | |
|                 # and update the last line's metadata
 | |
|                 aggregated_chunks[-1]["metadata"] = line["metadata"]
 | |
|             else:
 | |
|                 # Otherwise, append the current line to the aggregated list
 | |
|                 aggregated_chunks.append(line)
 | |
| 
 | |
|         return [
 | |
|             Document(page_content=chunk["content"], metadata=chunk["metadata"])
 | |
|             for chunk in aggregated_chunks
 | |
|         ]
 | |
| 
 | |
|     def split_text(self, text: str) -> list[Document]:
 | |
|         """Split markdown file.
 | |
| 
 | |
|         Args:
 | |
|             text: Markdown file
 | |
|         """
 | |
|         # Split the input text by newline character ("\n").
 | |
|         lines = text.split("\n")
 | |
|         # Final output
 | |
|         lines_with_metadata: list[LineType] = []
 | |
|         # Content and metadata of the chunk currently being processed
 | |
|         current_content: list[str] = []
 | |
|         current_metadata: dict[str, str] = {}
 | |
|         # Keep track of the nested header structure
 | |
|         header_stack: list[HeaderType] = []
 | |
|         initial_metadata: dict[str, str] = {}
 | |
| 
 | |
|         in_code_block = False
 | |
|         opening_fence = ""
 | |
| 
 | |
|         for line in lines:
 | |
|             stripped_line = line.strip()
 | |
|             # Remove all non-printable characters from the string, keeping only visible
 | |
|             # text.
 | |
|             stripped_line = "".join(filter(str.isprintable, stripped_line))
 | |
|             if not in_code_block:
 | |
|                 # Exclude inline code spans
 | |
|                 if stripped_line.startswith("```") and stripped_line.count("```") == 1:
 | |
|                     in_code_block = True
 | |
|                     opening_fence = "```"
 | |
|                 elif stripped_line.startswith("~~~"):
 | |
|                     in_code_block = True
 | |
|                     opening_fence = "~~~"
 | |
|             elif stripped_line.startswith(opening_fence):
 | |
|                 in_code_block = False
 | |
|                 opening_fence = ""
 | |
| 
 | |
|             if in_code_block:
 | |
|                 current_content.append(stripped_line)
 | |
|                 continue
 | |
| 
 | |
|             # Check each line against each of the header types (e.g., #, ##)
 | |
|             for sep, name in self.headers_to_split_on:
 | |
|                 is_standard_header = stripped_line.startswith(sep) and (
 | |
|                     # Header with no text OR header is followed by space
 | |
|                     # Both are valid conditions that sep is being used a header
 | |
|                     len(stripped_line) == len(sep) or stripped_line[len(sep)] == " "
 | |
|                 )
 | |
|                 is_custom_header = self._is_custom_header(stripped_line, sep)
 | |
| 
 | |
|                 # Check if line matches either standard or custom header pattern
 | |
|                 if is_standard_header or is_custom_header:
 | |
|                     # Ensure we are tracking the header as metadata
 | |
|                     if name is not None:
 | |
|                         # Get the current header level
 | |
|                         if sep in self.custom_header_patterns:
 | |
|                             current_header_level = self.custom_header_patterns[sep]
 | |
|                         else:
 | |
|                             current_header_level = sep.count("#")
 | |
| 
 | |
|                         # Pop out headers of lower or same level from the stack
 | |
|                         while (
 | |
|                             header_stack
 | |
|                             and header_stack[-1]["level"] >= current_header_level
 | |
|                         ):
 | |
|                             # We have encountered a new header
 | |
|                             # at the same or higher level
 | |
|                             popped_header = header_stack.pop()
 | |
|                             # Clear the metadata for the
 | |
|                             # popped header in initial_metadata
 | |
|                             if popped_header["name"] in initial_metadata:
 | |
|                                 initial_metadata.pop(popped_header["name"])
 | |
| 
 | |
|                         # Push the current header to the stack
 | |
|                         # Extract header text based on header type
 | |
|                         if is_custom_header:
 | |
|                             # For custom headers like **Header**, extract text
 | |
|                             # between patterns
 | |
|                             header_text = stripped_line[len(sep) : -len(sep)].strip()
 | |
|                         else:
 | |
|                             # For standard headers like # Header, extract text
 | |
|                             # after the separator
 | |
|                             header_text = stripped_line[len(sep) :].strip()
 | |
| 
 | |
|                         header: HeaderType = {
 | |
|                             "level": current_header_level,
 | |
|                             "name": name,
 | |
|                             "data": header_text,
 | |
|                         }
 | |
|                         header_stack.append(header)
 | |
|                         # Update initial_metadata with the current header
 | |
|                         initial_metadata[name] = header["data"]
 | |
| 
 | |
|                     # Add the previous line to the lines_with_metadata
 | |
|                     # only if current_content is not empty
 | |
|                     if current_content:
 | |
|                         lines_with_metadata.append(
 | |
|                             {
 | |
|                                 "content": "\n".join(current_content),
 | |
|                                 "metadata": current_metadata.copy(),
 | |
|                             }
 | |
|                         )
 | |
|                         current_content.clear()
 | |
| 
 | |
|                     if not self.strip_headers:
 | |
|                         current_content.append(stripped_line)
 | |
| 
 | |
|                     break
 | |
|             else:
 | |
|                 if stripped_line:
 | |
|                     current_content.append(stripped_line)
 | |
|                 elif current_content:
 | |
|                     lines_with_metadata.append(
 | |
|                         {
 | |
|                             "content": "\n".join(current_content),
 | |
|                             "metadata": current_metadata.copy(),
 | |
|                         }
 | |
|                     )
 | |
|                     current_content.clear()
 | |
| 
 | |
|             current_metadata = initial_metadata.copy()
 | |
| 
 | |
|         if current_content:
 | |
|             lines_with_metadata.append(
 | |
|                 {
 | |
|                     "content": "\n".join(current_content),
 | |
|                     "metadata": current_metadata,
 | |
|                 }
 | |
|             )
 | |
| 
 | |
|         # lines_with_metadata has each line with associated header metadata
 | |
|         # aggregate these into chunks based on common metadata
 | |
|         if not self.return_each_line:
 | |
|             return self.aggregate_lines_to_chunks(lines_with_metadata)
 | |
|         return [
 | |
|             Document(page_content=chunk["content"], metadata=chunk["metadata"])
 | |
|             for chunk in lines_with_metadata
 | |
|         ]
 | |
| 
 | |
| 
 | |
| class LineType(TypedDict):
 | |
|     """Line type as typed dict."""
 | |
| 
 | |
|     metadata: dict[str, str]
 | |
|     content: str
 | |
| 
 | |
| 
 | |
| class HeaderType(TypedDict):
 | |
|     """Header type as typed dict."""
 | |
| 
 | |
|     level: int
 | |
|     name: str
 | |
|     data: str
 | |
| 
 | |
| 
 | |
| class ExperimentalMarkdownSyntaxTextSplitter:
 | |
|     """An experimental text splitter for handling Markdown syntax.
 | |
| 
 | |
|     This splitter aims to retain the exact whitespace of the original text while
 | |
|     extracting structured metadata, such as headers. It is a re-implementation of the
 | |
|     MarkdownHeaderTextSplitter with notable changes to the approach and
 | |
|     additional features.
 | |
| 
 | |
|     Key Features:
 | |
| 
 | |
|     * Retains the original whitespace and formatting of the Markdown text.
 | |
|     * Extracts headers, code blocks, and horizontal rules as metadata.
 | |
|     * Splits out code blocks and includes the language in the "Code" metadata key.
 | |
|     * Splits text on horizontal rules (`---`) as well.
 | |
|     * Defaults to sensible splitting behavior, which can be overridden using the
 | |
|       ``headers_to_split_on`` parameter.
 | |
| 
 | |
|     Example:
 | |
| 
 | |
|         .. code-block:: python
 | |
| 
 | |
|             headers_to_split_on = [
 | |
|                 ("#", "Header 1"),
 | |
|                 ("##", "Header 2"),
 | |
|             ]
 | |
|             splitter = ExperimentalMarkdownSyntaxTextSplitter(
 | |
|                 headers_to_split_on=headers_to_split_on
 | |
|             )
 | |
|             chunks = splitter.split(text)
 | |
|             for chunk in chunks:
 | |
|                 print(chunk)
 | |
| 
 | |
|     This class is currently experimental and subject to change based on feedback and
 | |
|     further development.
 | |
|     """
 | |
| 
 | |
|     DEFAULT_HEADER_KEYS = {
 | |
|         "#": "Header 1",
 | |
|         "##": "Header 2",
 | |
|         "###": "Header 3",
 | |
|         "####": "Header 4",
 | |
|         "#####": "Header 5",
 | |
|         "######": "Header 6",
 | |
|     }
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         headers_to_split_on: Union[list[tuple[str, str]], None] = None,
 | |
|         return_each_line: bool = False,  # noqa: FBT001,FBT002
 | |
|         strip_headers: bool = True,  # noqa: FBT001,FBT002
 | |
|     ) -> None:
 | |
|         """Initialize the text splitter with header splitting and formatting options.
 | |
| 
 | |
|         This constructor sets up the required configuration for splitting text into
 | |
|         chunks based on specified headers and formatting preferences.
 | |
| 
 | |
|         Args:
 | |
|             headers_to_split_on (Union[list[tuple[str, str]], None]):
 | |
|                 A list of tuples, where each tuple contains a header tag (e.g., "h1")
 | |
|                 and its corresponding metadata key. If None, default headers are used.
 | |
|             return_each_line (bool):
 | |
|                 Whether to return each line as an individual chunk.
 | |
|                 Defaults to False, which aggregates lines into larger chunks.
 | |
|             strip_headers (bool):
 | |
|                 Whether to exclude headers from the resulting chunks.
 | |
|                 Defaults to True.
 | |
|         """
 | |
|         self.chunks: list[Document] = []
 | |
|         self.current_chunk = Document(page_content="")
 | |
|         self.current_header_stack: list[tuple[int, str]] = []
 | |
|         self.strip_headers = strip_headers
 | |
|         if headers_to_split_on:
 | |
|             self.splittable_headers = dict(headers_to_split_on)
 | |
|         else:
 | |
|             self.splittable_headers = self.DEFAULT_HEADER_KEYS
 | |
| 
 | |
|         self.return_each_line = return_each_line
 | |
| 
 | |
|     def split_text(self, text: str) -> list[Document]:
 | |
|         """Split the input text into structured chunks.
 | |
| 
 | |
|         This method processes the input text line by line, identifying and handling
 | |
|         specific patterns such as headers, code blocks, and horizontal rules to
 | |
|         split it into structured chunks based on headers, code blocks, and
 | |
|         horizontal rules.
 | |
| 
 | |
|         Args:
 | |
|             text (str): The input text to be split into chunks.
 | |
| 
 | |
|         Returns:
 | |
|             List[Document]: A list of `Document` objects representing the structured
 | |
|             chunks of the input text. If `return_each_line` is enabled, each line
 | |
|             is returned as a separate `Document`.
 | |
|         """
 | |
|         # Reset the state for each new file processed
 | |
|         self.chunks.clear()
 | |
|         self.current_chunk = Document(page_content="")
 | |
|         self.current_header_stack.clear()
 | |
| 
 | |
|         raw_lines = text.splitlines(keepends=True)
 | |
| 
 | |
|         while raw_lines:
 | |
|             raw_line = raw_lines.pop(0)
 | |
|             header_match = self._match_header(raw_line)
 | |
|             code_match = self._match_code(raw_line)
 | |
|             horz_match = self._match_horz(raw_line)
 | |
|             if header_match:
 | |
|                 self._complete_chunk_doc()
 | |
| 
 | |
|                 if not self.strip_headers:
 | |
|                     self.current_chunk.page_content += raw_line
 | |
| 
 | |
|                 # add the header to the stack
 | |
|                 header_depth = len(header_match.group(1))
 | |
|                 header_text = header_match.group(2)
 | |
|                 self._resolve_header_stack(header_depth, header_text)
 | |
|             elif code_match:
 | |
|                 self._complete_chunk_doc()
 | |
|                 self.current_chunk.page_content = self._resolve_code_chunk(
 | |
|                     raw_line, raw_lines
 | |
|                 )
 | |
|                 self.current_chunk.metadata["Code"] = code_match.group(1)
 | |
|                 self._complete_chunk_doc()
 | |
|             elif horz_match:
 | |
|                 self._complete_chunk_doc()
 | |
|             else:
 | |
|                 self.current_chunk.page_content += raw_line
 | |
| 
 | |
|         self._complete_chunk_doc()
 | |
|         # I don't see why `return_each_line` is a necessary feature of this splitter.
 | |
|         # It's easy enough to do outside of the class and the caller can have more
 | |
|         # control over it.
 | |
|         if self.return_each_line:
 | |
|             return [
 | |
|                 Document(page_content=line, metadata=chunk.metadata)
 | |
|                 for chunk in self.chunks
 | |
|                 for line in chunk.page_content.splitlines()
 | |
|                 if line and not line.isspace()
 | |
|             ]
 | |
|         return self.chunks
 | |
| 
 | |
|     def _resolve_header_stack(self, header_depth: int, header_text: str) -> None:
 | |
|         for i, (depth, _) in enumerate(self.current_header_stack):
 | |
|             if depth >= header_depth:
 | |
|                 # Truncate everything from this level onward
 | |
|                 self.current_header_stack = self.current_header_stack[:i]
 | |
|                 break
 | |
|         self.current_header_stack.append((header_depth, header_text))
 | |
| 
 | |
|     def _resolve_code_chunk(self, current_line: str, raw_lines: list[str]) -> str:
 | |
|         chunk = current_line
 | |
|         while raw_lines:
 | |
|             raw_line = raw_lines.pop(0)
 | |
|             chunk += raw_line
 | |
|             if self._match_code(raw_line):
 | |
|                 return chunk
 | |
|         return ""
 | |
| 
 | |
|     def _complete_chunk_doc(self) -> None:
 | |
|         chunk_content = self.current_chunk.page_content
 | |
|         # Discard any empty documents
 | |
|         if chunk_content and not chunk_content.isspace():
 | |
|             # Apply the header stack as metadata
 | |
|             for depth, value in self.current_header_stack:
 | |
|                 header_key = self.splittable_headers.get("#" * depth)
 | |
|                 self.current_chunk.metadata[header_key] = value
 | |
|             self.chunks.append(self.current_chunk)
 | |
|         # Reset the current chunk
 | |
|         self.current_chunk = Document(page_content="")
 | |
| 
 | |
|     # Match methods
 | |
|     def _match_header(self, line: str) -> Union[re.Match[str], None]:
 | |
|         match = re.match(r"^(#{1,6}) (.*)", line)
 | |
|         # Only matches on the configured headers
 | |
|         if match and match.group(1) in self.splittable_headers:
 | |
|             return match
 | |
|         return None
 | |
| 
 | |
|     def _match_code(self, line: str) -> Union[re.Match[str], None]:
 | |
|         matches = [re.match(rule, line) for rule in [r"^```(.*)", r"^~~~(.*)"]]
 | |
|         return next((match for match in matches if match), None)
 | |
| 
 | |
|     def _match_horz(self, line: str) -> Union[re.Match[str], None]:
 | |
|         matches = [
 | |
|             re.match(rule, line) for rule in [r"^\*\*\*+\n", r"^---+\n", r"^___+\n"]
 | |
|         ]
 | |
|         return next((match for match in matches if match), None)
 |