mirror of
https://github.com/hwchase17/langchain.git
synced 2025-12-26 17:36:35 +00:00
fix(openai): Respect 300k token limit for embeddings API requests (#33668)
## Description Fixes #31227 - Resolves the issue where `OpenAIEmbeddings` exceeds OpenAI's 300,000 token per request limit, causing 400 BadRequest errors. ## Problem When embedding large document sets, LangChain would send batches containing more than 300,000 tokens in a single API request, causing this error: ``` openai.BadRequestError: Error code: 400 - {'error': {'message': 'Requested 673477 tokens, max 300000 tokens per request'}} ``` The issue occurred because: - The code chunks texts by `embedding_ctx_length` (8191 tokens per chunk) - Then batches chunks by `chunk_size` (default 1000 chunks per request) - **But didn't check**: Total tokens per batch against OpenAI's 300k limit - Result: `1000 chunks × 8191 tokens = 8,191,000 tokens` → Exceeds limit! ## Solution This PR implements dynamic batching that respects the 300k token limit: 1. **Added constant**: `MAX_TOKENS_PER_REQUEST = 300000` 2. **Track token counts**: Calculate actual tokens for each chunk 3. **Dynamic batching**: Instead of fixed `chunk_size` batches, accumulate chunks until approaching the 300k limit 4. **Applied to both sync and async**: Fixed both `_get_len_safe_embeddings` and `_aget_len_safe_embeddings` ## Changes - Modified `langchain_openai/embeddings/base.py`: - Added `MAX_TOKENS_PER_REQUEST` constant - Replaced fixed-size batching with token-aware dynamic batching - Applied to both sync (line ~478) and async (line ~527) methods - Added test in `tests/unit_tests/embeddings/test_base.py`: - `test_embeddings_respects_token_limit()` - Verifies large document sets are properly batched ## Testing All existing tests pass (280 passed, 4 xfailed, 1 xpassed). New test verifies: - Large document sets (500 texts × 1000 tokens = 500k tokens) are split into multiple API calls - Each API call respects the 300k token limit ## Usage After this fix, users can embed large document sets without errors: ```python from langchain_openai import OpenAIEmbeddings from langchain_chroma import Chroma from langchain_text_splitters import CharacterTextSplitter # This will now work without exceeding token limits embeddings = OpenAIEmbeddings() documents = CharacterTextSplitter().split_documents(large_documents) Chroma.from_documents(documents, embeddings) ``` Resolves #31227 --------- Co-authored-by: Kaparthy Reddy <kaparthyreddy@Kaparthys-MacBook-Air.local> Co-authored-by: Chester Curme <chester.curme@gmail.com> Co-authored-by: Mason Daugherty <mason@langchain.dev> Co-authored-by: Mason Daugherty <github@mdrxy.com>
This commit is contained in:
@@ -19,6 +19,9 @@ from langchain_openai.chat_models._client_utils import _resolve_sync_and_async_a
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MAX_TOKENS_PER_REQUEST = 300000
|
||||
"""API limit per request for embedding tokens."""
|
||||
|
||||
|
||||
def _process_batched_chunked_embeddings(
|
||||
num_texts: int,
|
||||
@@ -524,9 +527,9 @@ class OpenAIEmbeddings(BaseModel, Embeddings):
|
||||
) -> list[list[float]]:
|
||||
"""Generate length-safe embeddings for a list of texts.
|
||||
|
||||
This method handles tokenization and embedding generation, respecting the
|
||||
set embedding context length and chunk size. It supports both tiktoken
|
||||
and HuggingFace tokenizer based on the tiktoken_enabled flag.
|
||||
This method handles tokenization and embedding generation, respecting the set
|
||||
embedding context length and chunk size. It supports both tiktoken and
|
||||
HuggingFace tokenizer based on the tiktoken_enabled flag.
|
||||
|
||||
Args:
|
||||
texts: A list of texts to embed.
|
||||
@@ -540,14 +543,38 @@ class OpenAIEmbeddings(BaseModel, Embeddings):
|
||||
client_kwargs = {**self._invocation_params, **kwargs}
|
||||
_iter, tokens, indices = self._tokenize(texts, _chunk_size)
|
||||
batched_embeddings: list[list[float]] = []
|
||||
for i in _iter:
|
||||
response = self.client.create(
|
||||
input=tokens[i : i + _chunk_size], **client_kwargs
|
||||
)
|
||||
# Calculate token counts per chunk
|
||||
token_counts = [
|
||||
len(t) if isinstance(t, list) else len(t.split()) for t in tokens
|
||||
]
|
||||
|
||||
# Process in batches respecting the token limit
|
||||
i = 0
|
||||
while i < len(tokens):
|
||||
# Determine how many chunks we can include in this batch
|
||||
batch_token_count = 0
|
||||
batch_end = i
|
||||
|
||||
for j in range(i, min(i + _chunk_size, len(tokens))):
|
||||
chunk_tokens = token_counts[j]
|
||||
# Check if adding this chunk would exceed the limit
|
||||
if batch_token_count + chunk_tokens > MAX_TOKENS_PER_REQUEST:
|
||||
if batch_end == i:
|
||||
# Single chunk exceeds limit - handle it anyway
|
||||
batch_end = j + 1
|
||||
break
|
||||
batch_token_count += chunk_tokens
|
||||
batch_end = j + 1
|
||||
|
||||
# Make API call with this batch
|
||||
batch_tokens = tokens[i:batch_end]
|
||||
response = self.client.create(input=batch_tokens, **client_kwargs)
|
||||
if not isinstance(response, dict):
|
||||
response = response.model_dump()
|
||||
batched_embeddings.extend(r["embedding"] for r in response["data"])
|
||||
|
||||
i = batch_end
|
||||
|
||||
embeddings = _process_batched_chunked_embeddings(
|
||||
len(texts), tokens, batched_embeddings, indices, self.skip_empty
|
||||
)
|
||||
@@ -594,15 +621,40 @@ class OpenAIEmbeddings(BaseModel, Embeddings):
|
||||
None, self._tokenize, texts, _chunk_size
|
||||
)
|
||||
batched_embeddings: list[list[float]] = []
|
||||
for i in range(0, len(tokens), _chunk_size):
|
||||
response = await self.async_client.create(
|
||||
input=tokens[i : i + _chunk_size], **client_kwargs
|
||||
)
|
||||
# Calculate token counts per chunk
|
||||
token_counts = [
|
||||
len(t) if isinstance(t, list) else len(t.split()) for t in tokens
|
||||
]
|
||||
|
||||
# Process in batches respecting the token limit
|
||||
i = 0
|
||||
while i < len(tokens):
|
||||
# Determine how many chunks we can include in this batch
|
||||
batch_token_count = 0
|
||||
batch_end = i
|
||||
|
||||
for j in range(i, min(i + _chunk_size, len(tokens))):
|
||||
chunk_tokens = token_counts[j]
|
||||
# Check if adding this chunk would exceed the limit
|
||||
if batch_token_count + chunk_tokens > MAX_TOKENS_PER_REQUEST:
|
||||
if batch_end == i:
|
||||
# Single chunk exceeds limit - handle it anyway
|
||||
batch_end = j + 1
|
||||
break
|
||||
batch_token_count += chunk_tokens
|
||||
batch_end = j + 1
|
||||
|
||||
# Make API call with this batch
|
||||
batch_tokens = tokens[i:batch_end]
|
||||
response = await self.async_client.create(
|
||||
input=batch_tokens, **client_kwargs
|
||||
)
|
||||
if not isinstance(response, dict):
|
||||
response = response.model_dump()
|
||||
batched_embeddings.extend(r["embedding"] for r in response["data"])
|
||||
|
||||
i = batch_end
|
||||
|
||||
embeddings = _process_batched_chunked_embeddings(
|
||||
len(texts), tokens, batched_embeddings, indices, self.skip_empty
|
||||
)
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import os
|
||||
from unittest.mock import patch
|
||||
from typing import Any
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
from pydantic import SecretStr
|
||||
|
||||
from langchain_openai import OpenAIEmbeddings
|
||||
|
||||
@@ -96,3 +98,53 @@ async def test_embed_with_kwargs_async() -> None:
|
||||
mock_create.assert_any_call(input=texts, **client_kwargs)
|
||||
|
||||
assert result == [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]
|
||||
|
||||
|
||||
def test_embeddings_respects_token_limit() -> None:
|
||||
"""Test that embeddings respect the 300k token per request limit."""
|
||||
# Create embeddings instance
|
||||
embeddings = OpenAIEmbeddings(
|
||||
model="text-embedding-ada-002", api_key=SecretStr("test-key")
|
||||
)
|
||||
|
||||
call_counts = []
|
||||
|
||||
def mock_create(**kwargs: Any) -> Mock:
|
||||
input_ = kwargs["input"]
|
||||
# Track how many tokens in this call
|
||||
if isinstance(input_, list):
|
||||
total_tokens = sum(
|
||||
len(t) if isinstance(t, list) else len(t.split()) for t in input_
|
||||
)
|
||||
call_counts.append(total_tokens)
|
||||
# Verify this call doesn't exceed limit
|
||||
assert total_tokens <= 300000, (
|
||||
f"Batch exceeded token limit: {total_tokens} tokens"
|
||||
)
|
||||
|
||||
# Return mock response
|
||||
mock_response = Mock()
|
||||
mock_response.model_dump.return_value = {
|
||||
"data": [
|
||||
{"embedding": [0.1] * 1536}
|
||||
for _ in range(len(input_) if isinstance(input_, list) else 1)
|
||||
]
|
||||
}
|
||||
return mock_response
|
||||
|
||||
embeddings.client.create = mock_create
|
||||
|
||||
# Create a scenario that would exceed 300k tokens in a single batch
|
||||
# with default chunk_size=1000
|
||||
# Simulate 500 texts with ~1000 tokens each = 500k tokens total
|
||||
large_texts = ["word " * 1000 for _ in range(500)]
|
||||
|
||||
# This should not raise an error anymore
|
||||
embeddings.embed_documents(large_texts)
|
||||
|
||||
# Verify we made multiple API calls to respect the limit
|
||||
assert len(call_counts) > 1, "Should have split into multiple batches"
|
||||
|
||||
# Verify each call respected the limit
|
||||
for count in call_counts:
|
||||
assert count <= 300000, f"Batch exceeded limit: {count}"
|
||||
|
||||
Reference in New Issue
Block a user