text-splitters: Add ruff rule UP (pyupgrade) (#31841)

See https://docs.astral.sh/ruff/rules/#pyupgrade-up
All auto-fixed except `typing.AbstractSet` -> `collections.abc.Set`
This commit is contained in:
Christophe Bornet 2025-07-03 16:11:35 +02:00 committed by GitHub
parent 911b0b69ea
commit 802d2bf249
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 106 additions and 115 deletions

View File

@ -3,19 +3,14 @@ from __future__ import annotations
import copy import copy
import logging import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections.abc import Collection, Iterable, Sequence, Set
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from typing import ( from typing import (
AbstractSet,
Any, Any,
Callable, Callable,
Collection,
Iterable,
List,
Literal, Literal,
Optional, Optional,
Sequence,
Type,
TypeVar, TypeVar,
Union, Union,
) )
@ -64,12 +59,12 @@ class TextSplitter(BaseDocumentTransformer, ABC):
self._strip_whitespace = strip_whitespace self._strip_whitespace = strip_whitespace
@abstractmethod @abstractmethod
def split_text(self, text: str) -> List[str]: def split_text(self, text: str) -> list[str]:
"""Split text into multiple components.""" """Split text into multiple components."""
def create_documents( def create_documents(
self, texts: list[str], metadatas: Optional[list[dict[Any, Any]]] = None self, texts: list[str], metadatas: Optional[list[dict[Any, Any]]] = None
) -> List[Document]: ) -> list[Document]:
"""Create documents from a list of texts.""" """Create documents from a list of texts."""
_metadatas = metadatas or [{}] * len(texts) _metadatas = metadatas or [{}] * len(texts)
documents = [] documents = []
@ -87,7 +82,7 @@ class TextSplitter(BaseDocumentTransformer, ABC):
documents.append(new_doc) documents.append(new_doc)
return documents return documents
def split_documents(self, documents: Iterable[Document]) -> List[Document]: def split_documents(self, documents: Iterable[Document]) -> list[Document]:
"""Split documents.""" """Split documents."""
texts, metadatas = [], [] texts, metadatas = [], []
for doc in documents: for doc in documents:
@ -95,7 +90,7 @@ class TextSplitter(BaseDocumentTransformer, ABC):
metadatas.append(doc.metadata) metadatas.append(doc.metadata)
return self.create_documents(texts, metadatas=metadatas) return self.create_documents(texts, metadatas=metadatas)
def _join_docs(self, docs: List[str], separator: str) -> Optional[str]: def _join_docs(self, docs: list[str], separator: str) -> Optional[str]:
text = separator.join(docs) text = separator.join(docs)
if self._strip_whitespace: if self._strip_whitespace:
text = text.strip() text = text.strip()
@ -104,13 +99,13 @@ class TextSplitter(BaseDocumentTransformer, ABC):
else: else:
return text return text
def _merge_splits(self, splits: Iterable[str], separator: str) -> List[str]: def _merge_splits(self, splits: Iterable[str], separator: str) -> list[str]:
# We now want to combine these smaller pieces into medium size # We now want to combine these smaller pieces into medium size
# chunks to send to the LLM. # chunks to send to the LLM.
separator_len = self._length_function(separator) separator_len = self._length_function(separator)
docs = [] docs = []
current_doc: List[str] = [] current_doc: list[str] = []
total = 0 total = 0
for d in splits: for d in splits:
_len = self._length_function(d) _len = self._length_function(d)
@ -169,10 +164,10 @@ class TextSplitter(BaseDocumentTransformer, ABC):
@classmethod @classmethod
def from_tiktoken_encoder( def from_tiktoken_encoder(
cls: Type[TS], cls: type[TS],
encoding_name: str = "gpt2", encoding_name: str = "gpt2",
model_name: Optional[str] = None, model_name: Optional[str] = None,
allowed_special: Union[Literal["all"], AbstractSet[str]] = set(), allowed_special: Union[Literal["all"], Set[str]] = set(),
disallowed_special: Union[Literal["all"], Collection[str]] = "all", disallowed_special: Union[Literal["all"], Collection[str]] = "all",
**kwargs: Any, **kwargs: Any,
) -> TS: ) -> TS:
@ -225,7 +220,7 @@ class TokenTextSplitter(TextSplitter):
self, self,
encoding_name: str = "gpt2", encoding_name: str = "gpt2",
model_name: Optional[str] = None, model_name: Optional[str] = None,
allowed_special: Union[Literal["all"], AbstractSet[str]] = set(), allowed_special: Union[Literal["all"], Set[str]] = set(),
disallowed_special: Union[Literal["all"], Collection[str]] = "all", disallowed_special: Union[Literal["all"], Collection[str]] = "all",
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None:
@ -248,7 +243,7 @@ class TokenTextSplitter(TextSplitter):
self._allowed_special = allowed_special self._allowed_special = allowed_special
self._disallowed_special = disallowed_special self._disallowed_special = disallowed_special
def split_text(self, text: str) -> List[str]: def split_text(self, text: str) -> list[str]:
"""Splits the input text into smaller chunks based on tokenization. """Splits the input text into smaller chunks based on tokenization.
This method uses a custom tokenizer configuration to encode the input text This method uses a custom tokenizer configuration to encode the input text
@ -264,7 +259,7 @@ class TokenTextSplitter(TextSplitter):
of the input text based on the tokenization and chunking rules. of the input text based on the tokenization and chunking rules.
""" """
def _encode(_text: str) -> List[int]: def _encode(_text: str) -> list[int]:
return self._tokenizer.encode( return self._tokenizer.encode(
_text, _text,
allowed_special=self._allowed_special, allowed_special=self._allowed_special,
@ -320,15 +315,15 @@ class Tokenizer:
"""Overlap in tokens between chunks""" """Overlap in tokens between chunks"""
tokens_per_chunk: int tokens_per_chunk: int
"""Maximum number of tokens per chunk""" """Maximum number of tokens per chunk"""
decode: Callable[[List[int]], str] decode: Callable[[list[int]], str]
""" Function to decode a list of token ids to a string""" """ Function to decode a list of token ids to a string"""
encode: Callable[[str], List[int]] encode: Callable[[str], list[int]]
""" Function to encode a string to a list of token ids""" """ Function to encode a string to a list of token ids"""
def split_text_on_tokens(*, text: str, tokenizer: Tokenizer) -> List[str]: def split_text_on_tokens(*, text: str, tokenizer: Tokenizer) -> list[str]:
"""Split incoming text and return chunks using tokenizer.""" """Split incoming text and return chunks using tokenizer."""
splits: List[str] = [] splits: list[str] = []
input_ids = tokenizer.encode(text) input_ids = tokenizer.encode(text)
start_idx = 0 start_idx = 0
cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids)) cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))

View File

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
import re import re
from typing import Any, List, Literal, Optional, Union from typing import Any, Literal, Optional, Union
from langchain_text_splitters.base import Language, TextSplitter from langchain_text_splitters.base import Language, TextSplitter
@ -17,7 +17,7 @@ class CharacterTextSplitter(TextSplitter):
self._separator = separator self._separator = separator
self._is_separator_regex = is_separator_regex self._is_separator_regex = is_separator_regex
def split_text(self, text: str) -> List[str]: def split_text(self, text: str) -> list[str]:
"""Split into chunks without re-inserting lookaround separators.""" """Split into chunks without re-inserting lookaround separators."""
# 1. Determine split pattern: raw regex or escaped literal # 1. Determine split pattern: raw regex or escaped literal
sep_pattern = ( sep_pattern = (
@ -46,7 +46,7 @@ class CharacterTextSplitter(TextSplitter):
def _split_text_with_regex( def _split_text_with_regex(
text: str, separator: str, keep_separator: Union[bool, Literal["start", "end"]] text: str, separator: str, keep_separator: Union[bool, Literal["start", "end"]]
) -> List[str]: ) -> list[str]:
# Now that we have the separator, split the text # Now that we have the separator, split the text
if separator: if separator:
if keep_separator: if keep_separator:
@ -80,7 +80,7 @@ class RecursiveCharacterTextSplitter(TextSplitter):
def __init__( def __init__(
self, self,
separators: Optional[List[str]] = None, separators: Optional[list[str]] = None,
keep_separator: Union[bool, Literal["start", "end"]] = True, keep_separator: Union[bool, Literal["start", "end"]] = True,
is_separator_regex: bool = False, is_separator_regex: bool = False,
**kwargs: Any, **kwargs: Any,
@ -90,7 +90,7 @@ class RecursiveCharacterTextSplitter(TextSplitter):
self._separators = separators or ["\n\n", "\n", " ", ""] self._separators = separators or ["\n\n", "\n", " ", ""]
self._is_separator_regex = is_separator_regex self._is_separator_regex = is_separator_regex
def _split_text(self, text: str, separators: List[str]) -> List[str]: def _split_text(self, text: str, separators: list[str]) -> list[str]:
"""Split incoming text and return chunks.""" """Split incoming text and return chunks."""
final_chunks = [] final_chunks = []
# Get appropriate separator to use # Get appropriate separator to use
@ -130,7 +130,7 @@ class RecursiveCharacterTextSplitter(TextSplitter):
final_chunks.extend(merged_text) final_chunks.extend(merged_text)
return final_chunks return final_chunks
def split_text(self, text: str) -> List[str]: def split_text(self, text: str) -> list[str]:
"""Split the input text into smaller chunks based on predefined separators. """Split the input text into smaller chunks based on predefined separators.
Args: Args:
@ -161,7 +161,7 @@ class RecursiveCharacterTextSplitter(TextSplitter):
return cls(separators=separators, is_separator_regex=True, **kwargs) return cls(separators=separators, is_separator_regex=True, **kwargs)
@staticmethod @staticmethod
def get_separators_for_language(language: Language) -> List[str]: def get_separators_for_language(language: Language) -> list[str]:
"""Retrieve a list of separators specific to the given language. """Retrieve a list of separators specific to the given language.
Args: Args:

View File

@ -3,17 +3,13 @@ from __future__ import annotations
import copy import copy
import pathlib import pathlib
import re import re
from collections.abc import Iterable, Sequence
from io import StringIO from io import StringIO
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Dict,
Iterable,
List,
Literal, Literal,
Optional, Optional,
Sequence,
Tuple,
TypedDict, TypedDict,
Union, Union,
cast, cast,
@ -32,7 +28,7 @@ class ElementType(TypedDict):
url: str url: str
xpath: str xpath: str
content: str content: str
metadata: Dict[str, str] metadata: dict[str, str]
class HTMLHeaderTextSplitter: class HTMLHeaderTextSplitter:
@ -115,7 +111,7 @@ class HTMLHeaderTextSplitter:
def __init__( def __init__(
self, self,
headers_to_split_on: List[Tuple[str, str]], headers_to_split_on: list[tuple[str, str]],
return_each_element: bool = False, return_each_element: bool = False,
) -> None: ) -> None:
"""Initialize with headers to split on. """Initialize with headers to split on.
@ -134,7 +130,7 @@ class HTMLHeaderTextSplitter:
self.header_tags = [tag for tag, _ in self.headers_to_split_on] self.header_tags = [tag for tag, _ in self.headers_to_split_on]
self.return_each_element = return_each_element self.return_each_element = return_each_element
def split_text(self, text: str) -> List[Document]: def split_text(self, text: str) -> list[Document]:
"""Split the given text into a list of Document objects. """Split the given text into a list of Document objects.
Args: Args:
@ -147,7 +143,7 @@ class HTMLHeaderTextSplitter:
def split_text_from_url( def split_text_from_url(
self, url: str, timeout: int = 10, **kwargs: Any self, url: str, timeout: int = 10, **kwargs: Any
) -> List[Document]: ) -> list[Document]:
"""Fetch text content from a URL and split it into documents. """Fetch text content from a URL and split it into documents.
Args: Args:
@ -166,7 +162,7 @@ class HTMLHeaderTextSplitter:
response.raise_for_status() response.raise_for_status()
return self.split_text(response.text) return self.split_text(response.text)
def split_text_from_file(self, file: Any) -> List[Document]: def split_text_from_file(self, file: Any) -> list[Document]:
"""Split HTML content from a file into a list of Document objects. """Split HTML content from a file into a list of Document objects.
Args: Args:
@ -176,7 +172,7 @@ class HTMLHeaderTextSplitter:
A list of split Document objects. A list of split Document objects.
""" """
if isinstance(file, str): if isinstance(file, str):
with open(file, "r", encoding="utf-8") as f: with open(file, encoding="utf-8") as f:
html_content = f.read() html_content = f.read()
else: else:
html_content = file.read() html_content = file.read()
@ -208,8 +204,8 @@ class HTMLHeaderTextSplitter:
# Dictionary of active headers: # Dictionary of active headers:
# key = user-defined header name (e.g. "Header 1") # key = user-defined header name (e.g. "Header 1")
# value = (header_text, level, dom_depth) # value = (header_text, level, dom_depth)
active_headers: Dict[str, Tuple[str, int, int]] = {} active_headers: dict[str, tuple[str, int, int]] = {}
current_chunk: List[str] = [] current_chunk: list[str] = []
def finalize_chunk() -> Optional[Document]: def finalize_chunk() -> Optional[Document]:
"""Finalize the accumulated chunk into a single Document.""" """Finalize the accumulated chunk into a single Document."""
@ -308,7 +304,7 @@ class HTMLSectionSplitter:
def __init__( def __init__(
self, self,
headers_to_split_on: List[Tuple[str, str]], headers_to_split_on: list[tuple[str, str]],
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None:
"""Create a new HTMLSectionSplitter. """Create a new HTMLSectionSplitter.
@ -326,7 +322,7 @@ class HTMLSectionSplitter:
).absolute() ).absolute()
self.kwargs = kwargs self.kwargs = kwargs
def split_documents(self, documents: Iterable[Document]) -> List[Document]: def split_documents(self, documents: Iterable[Document]) -> list[Document]:
"""Split documents.""" """Split documents."""
texts, metadatas = [], [] texts, metadatas = [], []
for doc in documents: for doc in documents:
@ -338,7 +334,7 @@ class HTMLSectionSplitter:
return text_splitter.split_documents(results) return text_splitter.split_documents(results)
def split_text(self, text: str) -> List[Document]: def split_text(self, text: str) -> list[Document]:
"""Split HTML text string. """Split HTML text string.
Args: Args:
@ -364,7 +360,7 @@ class HTMLSectionSplitter:
documents.append(new_doc) documents.append(new_doc)
return documents return documents
def split_html_by_headers(self, html_doc: str) -> List[Dict[str, Optional[str]]]: def split_html_by_headers(self, html_doc: str) -> list[dict[str, Optional[str]]]:
"""Split an HTML document into sections based on specified header tags. """Split an HTML document into sections based on specified header tags.
This method uses BeautifulSoup to parse the HTML content and divides it into This method uses BeautifulSoup to parse the HTML content and divides it into
@ -466,7 +462,7 @@ class HTMLSectionSplitter:
result = transform(tree) result = transform(tree)
return str(result) return str(result)
def split_text_from_file(self, file: Any) -> List[Document]: def split_text_from_file(self, file: Any) -> list[Document]:
"""Split HTML content from a file into a list of Document objects. """Split HTML content from a file into a list of Document objects.
Args: Args:
@ -571,23 +567,23 @@ class HTMLSemanticPreservingSplitter(BaseDocumentTransformer):
def __init__( def __init__(
self, self,
headers_to_split_on: List[Tuple[str, str]], headers_to_split_on: list[tuple[str, str]],
*, *,
max_chunk_size: int = 1000, max_chunk_size: int = 1000,
chunk_overlap: int = 0, chunk_overlap: int = 0,
separators: Optional[List[str]] = None, separators: Optional[list[str]] = None,
elements_to_preserve: Optional[List[str]] = None, elements_to_preserve: Optional[list[str]] = None,
preserve_links: bool = False, preserve_links: bool = False,
preserve_images: bool = False, preserve_images: bool = False,
preserve_videos: bool = False, preserve_videos: bool = False,
preserve_audio: bool = False, preserve_audio: bool = False,
custom_handlers: Optional[Dict[str, Callable[[Any], str]]] = None, custom_handlers: Optional[dict[str, Callable[[Any], str]]] = None,
stopword_removal: bool = False, stopword_removal: bool = False,
stopword_lang: str = "english", stopword_lang: str = "english",
normalize_text: bool = False, normalize_text: bool = False,
external_metadata: Optional[Dict[str, str]] = None, external_metadata: Optional[dict[str, str]] = None,
allowlist_tags: Optional[List[str]] = None, allowlist_tags: Optional[list[str]] = None,
denylist_tags: Optional[List[str]] = None, denylist_tags: Optional[list[str]] = None,
preserve_parent_metadata: bool = False, preserve_parent_metadata: bool = False,
keep_separator: Union[bool, Literal["start", "end"]] = True, keep_separator: Union[bool, Literal["start", "end"]] = True,
): ):
@ -654,7 +650,7 @@ class HTMLSemanticPreservingSplitter(BaseDocumentTransformer):
"Could not import nltk. Please install it with 'pip install nltk'." "Could not import nltk. Please install it with 'pip install nltk'."
) )
def split_text(self, text: str) -> List[Document]: def split_text(self, text: str) -> list[Document]:
"""Splits the provided HTML text into smaller chunks based on the configuration. """Splits the provided HTML text into smaller chunks based on the configuration.
Args: Args:
@ -677,7 +673,7 @@ class HTMLSemanticPreservingSplitter(BaseDocumentTransformer):
def transform_documents( def transform_documents(
self, documents: Sequence[Document], **kwargs: Any self, documents: Sequence[Document], **kwargs: Any
) -> List[Document]: ) -> list[Document]:
"""Transform sequence of documents by splitting them.""" """Transform sequence of documents by splitting them."""
transformed = [] transformed = []
for doc in documents: for doc in documents:
@ -776,7 +772,7 @@ class HTMLSemanticPreservingSplitter(BaseDocumentTransformer):
return text return text
def _process_html(self, soup: Any) -> List[Document]: def _process_html(self, soup: Any) -> list[Document]:
"""Processes the HTML content using BeautifulSoup and splits it using headers. """Processes the HTML content using BeautifulSoup and splits it using headers.
Args: Args:
@ -785,10 +781,10 @@ class HTMLSemanticPreservingSplitter(BaseDocumentTransformer):
Returns: Returns:
List[Document]: A list of Document objects containing the split content. List[Document]: A list of Document objects containing the split content.
""" """
documents: List[Document] = [] documents: list[Document] = []
current_headers: Dict[str, str] = {} current_headers: dict[str, str] = {}
current_content: List[str] = [] current_content: list[str] = []
preserved_elements: Dict[str, str] = {} preserved_elements: dict[str, str] = {}
placeholder_count: int = 0 placeholder_count: int = 0
def _get_element_text(element: Any) -> str: def _get_element_text(element: Any) -> str:
@ -821,13 +817,13 @@ class HTMLSemanticPreservingSplitter(BaseDocumentTransformer):
elements = soup.find_all(recursive=False) elements = soup.find_all(recursive=False)
def _process_element( def _process_element(
element: List[Any], element: list[Any],
documents: List[Document], documents: list[Document],
current_headers: Dict[str, str], current_headers: dict[str, str],
current_content: List[str], current_content: list[str],
preserved_elements: Dict[str, str], preserved_elements: dict[str, str],
placeholder_count: int, placeholder_count: int,
) -> Tuple[List[Document], Dict[str, str], List[str], Dict[str, str], int]: ) -> tuple[list[Document], dict[str, str], list[str], dict[str, str], int]:
for elem in element: for elem in element:
if elem.name.lower() in ["html", "body", "div", "main"]: if elem.name.lower() in ["html", "body", "div", "main"]:
children = elem.find_all(recursive=False) children = elem.find_all(recursive=False)
@ -910,7 +906,7 @@ class HTMLSemanticPreservingSplitter(BaseDocumentTransformer):
def _create_documents( def _create_documents(
self, headers: dict[str, str], content: str, preserved_elements: dict[str, str] self, headers: dict[str, str], content: str, preserved_elements: dict[str, str]
) -> List[Document]: ) -> list[Document]:
"""Creates Document objects from the provided headers, content, and elements. """Creates Document objects from the provided headers, content, and elements.
Args: Args:
@ -936,7 +932,7 @@ class HTMLSemanticPreservingSplitter(BaseDocumentTransformer):
def _further_split_chunk( def _further_split_chunk(
self, content: str, metadata: dict[Any, Any], preserved_elements: dict[str, str] self, content: str, metadata: dict[Any, Any], preserved_elements: dict[str, str]
) -> List[Document]: ) -> list[Document]:
"""Further splits the content into smaller chunks. """Further splits the content into smaller chunks.
Args: Args:

View File

@ -2,7 +2,7 @@ from __future__ import annotations
import copy import copy
import json import json
from typing import Any, Dict, List, Optional from typing import Any, Optional
from langchain_core.documents import Document from langchain_core.documents import Document
@ -123,10 +123,10 @@ class RecursiveJsonSplitter:
def split_text( def split_text(
self, self,
json_data: Dict[str, Any], json_data: dict[str, Any],
convert_lists: bool = False, convert_lists: bool = False,
ensure_ascii: bool = True, ensure_ascii: bool = True,
) -> List[str]: ) -> list[str]:
"""Splits JSON into a list of JSON formatted strings.""" """Splits JSON into a list of JSON formatted strings."""
chunks = self.split_json(json_data=json_data, convert_lists=convert_lists) chunks = self.split_json(json_data=json_data, convert_lists=convert_lists)

View File

@ -1,5 +1,5 @@
import re import re
from typing import Any, List, Optional from typing import Any, Optional
from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_text_splitters import RecursiveCharacterTextSplitter
@ -23,7 +23,7 @@ class JSFrameworkTextSplitter(RecursiveCharacterTextSplitter):
def __init__( def __init__(
self, self,
separators: Optional[List[str]] = None, separators: Optional[list[str]] = None,
chunk_size: int = 2000, chunk_size: int = 2000,
chunk_overlap: int = 0, chunk_overlap: int = 0,
**kwargs: Any, **kwargs: Any,
@ -39,7 +39,7 @@ class JSFrameworkTextSplitter(RecursiveCharacterTextSplitter):
super().__init__(chunk_size=chunk_size, chunk_overlap=chunk_overlap, **kwargs) super().__init__(chunk_size=chunk_size, chunk_overlap=chunk_overlap, **kwargs)
self._separators = separators or [] self._separators = separators or []
def split_text(self, text: str) -> List[str]: def split_text(self, text: str) -> list[str]:
"""Split text into chunks. """Split text into chunks.
This method splits the text into chunks by: This method splits the text into chunks by:

View File

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, List from typing import Any
from langchain_text_splitters.base import TextSplitter from langchain_text_splitters.base import TextSplitter
@ -30,7 +30,7 @@ class KonlpyTextSplitter(TextSplitter):
) )
self.kkma = konlpy.tag.Kkma() self.kkma = konlpy.tag.Kkma()
def split_text(self, text: str) -> List[str]: def split_text(self, text: str) -> list[str]:
"""Split incoming text and return chunks.""" """Split incoming text and return chunks."""
splits = self.kkma.sentences(text) splits = self.kkma.sentences(text)
return self._merge_splits(splits, self._separator) return self._merge_splits(splits, self._separator)

View File

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
import re import re
from typing import Any, Dict, List, Tuple, TypedDict, Union from typing import Any, TypedDict, Union
from langchain_core.documents import Document from langchain_core.documents import Document
@ -23,7 +23,7 @@ class MarkdownHeaderTextSplitter:
def __init__( def __init__(
self, self,
headers_to_split_on: List[Tuple[str, str]], headers_to_split_on: list[tuple[str, str]],
return_each_line: bool = False, return_each_line: bool = False,
strip_headers: bool = True, strip_headers: bool = True,
): ):
@ -44,13 +44,13 @@ class MarkdownHeaderTextSplitter:
# Strip headers split headers from the content of the chunk # Strip headers split headers from the content of the chunk
self.strip_headers = strip_headers self.strip_headers = strip_headers
def aggregate_lines_to_chunks(self, lines: List[LineType]) -> List[Document]: def aggregate_lines_to_chunks(self, lines: list[LineType]) -> list[Document]:
"""Combine lines with common metadata into chunks. """Combine lines with common metadata into chunks.
Args: Args:
lines: Line of text / associated header metadata lines: Line of text / associated header metadata
""" """
aggregated_chunks: List[LineType] = [] aggregated_chunks: list[LineType] = []
for line in lines: for line in lines:
if ( if (
@ -87,7 +87,7 @@ class MarkdownHeaderTextSplitter:
for chunk in aggregated_chunks for chunk in aggregated_chunks
] ]
def split_text(self, text: str) -> List[Document]: def split_text(self, text: str) -> list[Document]:
"""Split markdown file. """Split markdown file.
Args: Args:
@ -96,14 +96,14 @@ class MarkdownHeaderTextSplitter:
# Split the input text by newline character ("\n"). # Split the input text by newline character ("\n").
lines = text.split("\n") lines = text.split("\n")
# Final output # Final output
lines_with_metadata: List[LineType] = [] lines_with_metadata: list[LineType] = []
# Content and metadata of the chunk currently being processed # Content and metadata of the chunk currently being processed
current_content: List[str] = [] current_content: list[str] = []
current_metadata: Dict[str, str] = {} current_metadata: dict[str, str] = {}
# Keep track of the nested header structure # Keep track of the nested header structure
# header_stack: List[Dict[str, Union[int, str]]] = [] # header_stack: List[Dict[str, Union[int, str]]] = []
header_stack: List[HeaderType] = [] header_stack: list[HeaderType] = []
initial_metadata: Dict[str, str] = {} initial_metadata: dict[str, str] = {}
in_code_block = False in_code_block = False
opening_fence = "" opening_fence = ""
@ -217,7 +217,7 @@ class MarkdownHeaderTextSplitter:
class LineType(TypedDict): class LineType(TypedDict):
"""Line type as typed dict.""" """Line type as typed dict."""
metadata: Dict[str, str] metadata: dict[str, str]
content: str content: str
@ -280,7 +280,7 @@ class ExperimentalMarkdownSyntaxTextSplitter:
def __init__( def __init__(
self, self,
headers_to_split_on: Union[List[Tuple[str, str]], None] = None, headers_to_split_on: Union[list[tuple[str, str]], None] = None,
return_each_line: bool = False, return_each_line: bool = False,
strip_headers: bool = True, strip_headers: bool = True,
): ):
@ -300,9 +300,9 @@ class ExperimentalMarkdownSyntaxTextSplitter:
Whether to exclude headers from the resulting chunks. Whether to exclude headers from the resulting chunks.
Defaults to True. Defaults to True.
""" """
self.chunks: List[Document] = [] self.chunks: list[Document] = []
self.current_chunk = Document(page_content="") self.current_chunk = Document(page_content="")
self.current_header_stack: List[Tuple[int, str]] = [] self.current_header_stack: list[tuple[int, str]] = []
self.strip_headers = strip_headers self.strip_headers = strip_headers
if headers_to_split_on: if headers_to_split_on:
self.splittable_headers = dict(headers_to_split_on) self.splittable_headers = dict(headers_to_split_on)
@ -311,7 +311,7 @@ class ExperimentalMarkdownSyntaxTextSplitter:
self.return_each_line = return_each_line self.return_each_line = return_each_line
def split_text(self, text: str) -> List[Document]: def split_text(self, text: str) -> list[Document]:
"""Split the input text into structured chunks. """Split the input text into structured chunks.
This method processes the input text line by line, identifying and handling This method processes the input text line by line, identifying and handling
@ -382,7 +382,7 @@ class ExperimentalMarkdownSyntaxTextSplitter:
break break
self.current_header_stack.append((header_depth, header_text)) self.current_header_stack.append((header_depth, header_text))
def _resolve_code_chunk(self, current_line: str, raw_lines: List[str]) -> str: def _resolve_code_chunk(self, current_line: str, raw_lines: list[str]) -> str:
chunk = current_line chunk = current_line
while raw_lines: while raw_lines:
raw_line = raw_lines.pop(0) raw_line = raw_lines.pop(0)

View File

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, List from typing import Any
from langchain_text_splitters.base import TextSplitter from langchain_text_splitters.base import TextSplitter
@ -35,7 +35,7 @@ class NLTKTextSplitter(TextSplitter):
"NLTK is not installed, please install it with `pip install nltk`." "NLTK is not installed, please install it with `pip install nltk`."
) )
def split_text(self, text: str) -> List[str]: def split_text(self, text: str) -> list[str]:
"""Split incoming text and return chunks.""" """Split incoming text and return chunks."""
# First we naively split the large input into a bunch of smaller ones. # First we naively split the large input into a bunch of smaller ones.
if self._use_span_tokenize: if self._use_span_tokenize:

View File

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, List, Optional, cast from typing import Any, Optional, cast
from langchain_text_splitters.base import TextSplitter, Tokenizer, split_text_on_tokens from langchain_text_splitters.base import TextSplitter, Tokenizer, split_text_on_tokens
@ -50,7 +50,7 @@ class SentenceTransformersTokenTextSplitter(TextSplitter):
f" > maximum token limit." f" > maximum token limit."
) )
def split_text(self, text: str) -> List[str]: def split_text(self, text: str) -> list[str]:
"""Splits the input text into smaller components by splitting text on tokens. """Splits the input text into smaller components by splitting text on tokens.
This method encodes the input text using a private `_encode` method, then This method encodes the input text using a private `_encode` method, then
@ -65,7 +65,7 @@ class SentenceTransformersTokenTextSplitter(TextSplitter):
encoding and processing. encoding and processing.
""" """
def encode_strip_start_and_stop_token_ids(text: str) -> List[int]: def encode_strip_start_and_stop_token_ids(text: str) -> list[int]:
return self._encode(text)[1:-1] return self._encode(text)[1:-1]
tokenizer = Tokenizer( tokenizer = Tokenizer(

View File

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, List from typing import Any
from langchain_text_splitters.base import TextSplitter from langchain_text_splitters.base import TextSplitter
@ -31,7 +31,7 @@ class SpacyTextSplitter(TextSplitter):
self._separator = separator self._separator = separator
self._strip_whitespace = strip_whitespace self._strip_whitespace = strip_whitespace
def split_text(self, text: str) -> List[str]: def split_text(self, text: str) -> list[str]:
"""Split incoming text and return chunks.""" """Split incoming text and return chunks."""
splits = ( splits = (
s.text if self._strip_whitespace else s.text_with_ws s.text if self._strip_whitespace else s.text_with_ws

View File

@ -61,8 +61,8 @@ ignore_missing_imports = "True"
target-version = "py39" target-version = "py39"
[tool.ruff.lint] [tool.ruff.lint]
select = ["E", "F", "I", "PGH003", "T201", "D"] select = ["E", "F", "I", "UP", "PGH003", "T201", "D"]
ignore = ["D100"] ignore = ["D100", "UP007"]
[tool.coverage.run] [tool.coverage.run]
omit = ["tests/*"] omit = ["tests/*"]

View File

@ -1,7 +1,7 @@
"""Configuration for unit tests.""" """Configuration for unit tests."""
from collections.abc import Sequence
from importlib import util from importlib import util
from typing import Dict, Sequence
import pytest import pytest
from pytest import Config, Function, Parser from pytest import Config, Function, Parser
@ -39,7 +39,7 @@ def pytest_collection_modifyitems(config: Config, items: Sequence[Function]) ->
""" """
# Mapping from the name of a package to whether it is installed or not. # Mapping from the name of a package to whether it is installed or not.
# Used to avoid repeated calls to `util.find_spec` # Used to avoid repeated calls to `util.find_spec`
required_pkgs_info: Dict[str, bool] = {} required_pkgs_info: dict[str, bool] = {}
only_extended = config.getoption("--only-extended") or False only_extended = config.getoption("--only-extended") or False
only_core = config.getoption("--only-core") or False only_core = config.getoption("--only-core") or False

View File

@ -3,7 +3,7 @@
import random import random
import re import re
import string import string
from typing import Any, Callable, List, Tuple from typing import Any, Callable
import pytest import pytest
from langchain_core.documents import Document from langchain_core.documents import Document
@ -282,7 +282,7 @@ def test_create_documents_with_metadata() -> None:
], ],
) )
def test_create_documents_with_start_index( def test_create_documents_with_start_index(
splitter: TextSplitter, text: str, expected_docs: List[Document] splitter: TextSplitter, text: str, expected_docs: list[Document]
) -> None: ) -> None:
"""Test create documents method.""" """Test create documents method."""
docs = splitter.create_documents([text]) docs = splitter.create_documents([text])
@ -333,7 +333,7 @@ def test_iterative_text_splitter_discard_separator() -> None:
] ]
def __test_iterative_text_splitter(chunk_size: int, keep_separator: bool) -> List[str]: def __test_iterative_text_splitter(chunk_size: int, keep_separator: bool) -> list[str]:
chunk_size += 1 if keep_separator else 0 chunk_size += 1 if keep_separator else 0
splitter = RecursiveCharacterTextSplitter( splitter = RecursiveCharacterTextSplitter(
@ -2224,7 +2224,7 @@ def test_haskell_code_splitter() -> None:
@pytest.fixture @pytest.fixture
@pytest.mark.requires("bs4") @pytest.mark.requires("bs4")
def html_header_splitter_splitter_factory() -> Callable[ def html_header_splitter_splitter_factory() -> Callable[
[List[Tuple[str, str]]], HTMLHeaderTextSplitter [list[tuple[str, str]]], HTMLHeaderTextSplitter
]: ]:
""" """
Fixture to create an HTMLHeaderTextSplitter instance with given headers. Fixture to create an HTMLHeaderTextSplitter instance with given headers.
@ -2232,7 +2232,7 @@ def html_header_splitter_splitter_factory() -> Callable[
""" """
def _create_splitter( def _create_splitter(
headers_to_split_on: List[Tuple[str, str]], headers_to_split_on: list[tuple[str, str]],
) -> HTMLHeaderTextSplitter: ) -> HTMLHeaderTextSplitter:
return HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on) return HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
@ -2426,9 +2426,9 @@ def html_header_splitter_splitter_factory() -> Callable[
@pytest.mark.requires("bs4") @pytest.mark.requires("bs4")
def test_html_header_text_splitter( def test_html_header_text_splitter(
html_header_splitter_splitter_factory: Any, html_header_splitter_splitter_factory: Any,
headers_to_split_on: List[Tuple[str, str]], headers_to_split_on: list[tuple[str, str]],
html_input: str, html_input: str,
expected_documents: List[Document], expected_documents: list[Document],
test_case: str, test_case: str,
) -> None: ) -> None:
""" """
@ -2582,9 +2582,9 @@ def test_html_header_text_splitter(
@pytest.mark.requires("bs4") @pytest.mark.requires("bs4")
def test_additional_html_header_text_splitter( def test_additional_html_header_text_splitter(
html_header_splitter_splitter_factory: Any, html_header_splitter_splitter_factory: Any,
headers_to_split_on: List[Tuple[str, str]], headers_to_split_on: list[tuple[str, str]],
html_content: str, html_content: str,
expected_output: List[Document], expected_output: list[Document],
test_case: str, test_case: str,
) -> None: ) -> None:
""" """
@ -2653,9 +2653,9 @@ def test_additional_html_header_text_splitter(
@pytest.mark.requires("bs4") @pytest.mark.requires("bs4")
def test_html_no_headers_with_multiple_splitters( def test_html_no_headers_with_multiple_splitters(
html_header_splitter_splitter_factory: Any, html_header_splitter_splitter_factory: Any,
headers_to_split_on: List[Tuple[str, str]], headers_to_split_on: list[tuple[str, str]],
html_content: str, html_content: str,
expected_output: List[Document], expected_output: list[Document],
test_case: str, test_case: str,
) -> None: ) -> None:
""" """
@ -3572,7 +3572,7 @@ def test_character_text_splitter_chunk_size_effect(
is_regex: bool, is_regex: bool,
text: str, text: str,
chunk_size: int, chunk_size: int,
expected: List[str], expected: list[str],
) -> None: ) -> None:
splitter = CharacterTextSplitter( splitter = CharacterTextSplitter(
separator=separator, separator=separator,