mirror of
https://github.com/hwchase17/langchain.git
synced 2026-01-29 21:30:18 +00:00
Apply patch [skip ci]
This commit is contained in:
@@ -1,161 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Script to add batch API documentation to ChatOpenAI class docstring."""
|
||||
|
||||
import re
|
||||
|
||||
def add_batch_docstring():
|
||||
file_path = '/home/daytona/langchain/libs/partners/openai/langchain_openai/chat_models/base.py'
|
||||
|
||||
# Read the base.py file
|
||||
with open(file_path, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
# Find the location to insert the batch API documentation (before the closing docstring)
|
||||
pattern = r'(\s+)""" # noqa: E501'
|
||||
match = re.search(pattern, content)
|
||||
|
||||
if not match:
|
||||
print("Could not find insertion point")
|
||||
return False
|
||||
|
||||
indent = match.group(1)
|
||||
insert_pos = match.start()
|
||||
|
||||
# Define the batch API documentation to insert
|
||||
batch_docs = f'''
|
||||
.. dropdown:: Batch API for cost savings
|
||||
|
||||
.. versionadded:: 0.3.7
|
||||
|
||||
OpenAI's Batch API provides **50% cost savings** for non-real-time workloads by
|
||||
processing requests asynchronously. This is ideal for tasks like data processing,
|
||||
content generation, or evaluation that don't require immediate responses.
|
||||
|
||||
**Cost vs Latency Tradeoff:**
|
||||
|
||||
- **Standard API**: Immediate results, full pricing
|
||||
- **Batch API**: 50% cost savings, asynchronous processing (results available within 24 hours)
|
||||
|
||||
**Method 1: Direct batch management**
|
||||
|
||||
Use ``batch_create()`` and ``batch_retrieve()`` for full control over batch lifecycle:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from langchain_openai import ChatOpenAI
|
||||
from langchain_core.messages import HumanMessage
|
||||
|
||||
llm = ChatOpenAI(model="gpt-3.5-turbo")
|
||||
|
||||
# Prepare multiple message sequences for batch processing
|
||||
messages_list = [
|
||||
[HumanMessage(content="Translate 'hello' to French")],
|
||||
[HumanMessage(content="Translate 'goodbye' to Spanish")],
|
||||
[HumanMessage(content="What is the capital of Italy?")],
|
||||
]
|
||||
|
||||
# Create batch job (returns immediately with batch ID)
|
||||
batch_id = llm.batch_create(
|
||||
messages_list=messages_list,
|
||||
description="Translation and geography batch",
|
||||
metadata={{"project": "multilingual_qa", "user": "analyst_1"}},
|
||||
)
|
||||
print(f"Batch created: {{batch_id}}")
|
||||
|
||||
# Later, retrieve results (polls until completion)
|
||||
results = llm.batch_retrieve(
|
||||
batch_id=batch_id,
|
||||
poll_interval=60.0, # Check every minute
|
||||
timeout=3600.0, # 1 hour timeout
|
||||
)
|
||||
|
||||
# Process results
|
||||
for i, result in enumerate(results):
|
||||
response = result.generations[0].message.content
|
||||
print(f"Response {{i+1}}: {{response}}")
|
||||
|
||||
**Method 2: Enhanced batch() method**
|
||||
|
||||
Use the familiar ``batch()`` method with ``use_batch_api=True`` for seamless integration:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Standard batch processing (immediate, full cost)
|
||||
inputs = [
|
||||
[HumanMessage(content="What is 2+2?")],
|
||||
[HumanMessage(content="What is 3+3?")],
|
||||
]
|
||||
standard_results = llm.batch(inputs) # Default: use_batch_api=False
|
||||
|
||||
# Batch API processing (50% cost savings, polling)
|
||||
batch_results = llm.batch(
|
||||
inputs,
|
||||
use_batch_api=True, # Enable cost savings
|
||||
poll_interval=30.0, # Poll every 30 seconds
|
||||
timeout=1800.0, # 30 minute timeout
|
||||
)
|
||||
|
||||
**Batch creation with custom parameters:**
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Create batch with specific model parameters
|
||||
batch_id = llm.batch_create(
|
||||
messages_list=messages_list,
|
||||
description="Creative writing batch",
|
||||
metadata={{"task_type": "content_generation"}},
|
||||
temperature=0.8, # Higher creativity
|
||||
max_tokens=200, # Longer responses
|
||||
top_p=0.9, # Nucleus sampling
|
||||
)
|
||||
|
||||
**Error handling and monitoring:**
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from langchain_openai.chat_models.batch import BatchError
|
||||
|
||||
try:
|
||||
batch_id = llm.batch_create(messages_list)
|
||||
results = llm.batch_retrieve(batch_id, timeout=600.0)
|
||||
except BatchError as e:
|
||||
print(f"Batch processing failed: {{e}}")
|
||||
# Handle batch failure (retry, fallback to standard API, etc.)
|
||||
|
||||
**Best practices:**
|
||||
|
||||
- Use batch API for **non-urgent tasks** where 50% cost savings justify longer wait times
|
||||
- Set appropriate **timeouts** based on batch size (larger batches take longer)
|
||||
- Include **descriptive metadata** for tracking and debugging batch jobs
|
||||
- Consider **fallback strategies** for time-sensitive applications
|
||||
- Monitor batch status for **long-running jobs** to detect failures early
|
||||
|
||||
**When to use Batch API:**
|
||||
|
||||
✅ **Good for:**
|
||||
- Data processing and analysis
|
||||
- Content generation at scale
|
||||
- Model evaluation and testing
|
||||
- Batch translation or summarization
|
||||
- Non-interactive applications
|
||||
|
||||
❌ **Not suitable for:**
|
||||
- Real-time chat applications
|
||||
- Interactive user interfaces
|
||||
- Time-critical decision making
|
||||
- Applications requiring immediate responses
|
||||
|
||||
{indent}'''
|
||||
|
||||
# Insert the batch API documentation
|
||||
new_content = content[:insert_pos] + batch_docs + content[insert_pos:]
|
||||
|
||||
# Write back to file
|
||||
with open(file_path, 'w') as f:
|
||||
f.write(new_content)
|
||||
|
||||
print('Successfully added batch API documentation to ChatOpenAI class docstring')
|
||||
return True
|
||||
|
||||
if __name__ == "__main__":
|
||||
add_batch_docstring()
|
||||
@@ -1,166 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Script to add batch_create and batch_retrieve methods to BaseChatOpenAI class."""
|
||||
|
||||
import re
|
||||
|
||||
def add_batch_methods():
|
||||
file_path = '/home/daytona/langchain/libs/partners/openai/langchain_openai/chat_models/base.py'
|
||||
|
||||
# Read the base.py file
|
||||
with open(file_path, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
# Find the location to insert the methods (before _get_generation_chunk_from_completion)
|
||||
pattern = r'(\s+)def _get_generation_chunk_from_completion\('
|
||||
match = re.search(pattern, content)
|
||||
|
||||
if not match:
|
||||
print("Could not find insertion point")
|
||||
return False
|
||||
|
||||
indent = match.group(1)
|
||||
insert_pos = match.start()
|
||||
|
||||
# Define the methods to insert
|
||||
methods = f'''
|
||||
def batch_create(
|
||||
self,
|
||||
messages_list: List[List[BaseMessage]],
|
||||
*,
|
||||
description: Optional[str] = None,
|
||||
metadata: Optional[Dict[str, str]] = None,
|
||||
poll_interval: float = 10.0,
|
||||
timeout: Optional[float] = None,
|
||||
**kwargs: Any,
|
||||
) -> str:
|
||||
"""
|
||||
Create a batch job using OpenAI's Batch API for asynchronous processing.
|
||||
|
||||
This method provides 50% cost savings compared to the standard API in exchange
|
||||
for asynchronous processing with polling for results.
|
||||
|
||||
Args:
|
||||
messages_list: List of message sequences to process in batch.
|
||||
description: Optional description for the batch job.
|
||||
metadata: Optional metadata to attach to the batch job.
|
||||
poll_interval: Default time in seconds between status checks when polling.
|
||||
timeout: Default maximum time in seconds to wait for completion.
|
||||
**kwargs: Additional parameters to pass to chat completions.
|
||||
|
||||
Returns:
|
||||
The batch ID for tracking the asynchronous job.
|
||||
|
||||
Raises:
|
||||
BatchError: If batch creation fails.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
from langchain_openai import ChatOpenAI
|
||||
from langchain_core.messages import HumanMessage
|
||||
|
||||
llm = ChatOpenAI()
|
||||
messages_list = [
|
||||
[HumanMessage(content="What is 2+2?")],
|
||||
[HumanMessage(content="What is the capital of France?")],
|
||||
]
|
||||
|
||||
# Create batch job (50% cost savings)
|
||||
batch_id = llm.batch_create(messages_list)
|
||||
|
||||
# Later, retrieve results
|
||||
results = llm.batch_retrieve(batch_id)
|
||||
"""
|
||||
# Import here to avoid circular imports
|
||||
from langchain_openai.chat_models.batch import OpenAIBatchProcessor
|
||||
|
||||
# Create batch processor with current model settings
|
||||
processor = OpenAIBatchProcessor(
|
||||
client=self.root_client,
|
||||
model=self.model_name,
|
||||
poll_interval=poll_interval,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
# Filter and prepare kwargs for batch processing
|
||||
batch_kwargs = self._get_invocation_params(**kwargs)
|
||||
# Remove model from kwargs since it's handled by the processor
|
||||
batch_kwargs.pop("model", None)
|
||||
|
||||
return processor.create_batch(
|
||||
messages_list=messages_list,
|
||||
description=description,
|
||||
metadata=metadata,
|
||||
**batch_kwargs,
|
||||
)
|
||||
|
||||
def batch_retrieve(
|
||||
self,
|
||||
batch_id: str,
|
||||
*,
|
||||
poll_interval: Optional[float] = None,
|
||||
timeout: Optional[float] = None,
|
||||
) -> List[ChatResult]:
|
||||
"""
|
||||
Retrieve results from a batch job, polling until completion if necessary.
|
||||
|
||||
This method will poll the batch status until completion and return the results
|
||||
converted to LangChain ChatResult format.
|
||||
|
||||
Args:
|
||||
batch_id: The batch ID returned from batch_create().
|
||||
poll_interval: Time in seconds between status checks. Uses default if None.
|
||||
timeout: Maximum time in seconds to wait. Uses default if None.
|
||||
|
||||
Returns:
|
||||
List of ChatResult objects corresponding to the original message sequences.
|
||||
|
||||
Raises:
|
||||
BatchError: If batch retrieval fails, times out, or batch job failed.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
# After creating a batch job
|
||||
batch_id = llm.batch_create(messages_list)
|
||||
|
||||
# Retrieve results (will poll until completion)
|
||||
results = llm.batch_retrieve(batch_id)
|
||||
|
||||
for result in results:
|
||||
print(result.generations[0].message.content)
|
||||
"""
|
||||
# Import here to avoid circular imports
|
||||
from langchain_openai.chat_models.batch import OpenAIBatchProcessor
|
||||
|
||||
# Create batch processor with current model settings
|
||||
processor = OpenAIBatchProcessor(
|
||||
client=self.root_client,
|
||||
model=self.model_name,
|
||||
poll_interval=poll_interval or 10.0,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
# Poll for completion and retrieve results
|
||||
processor.poll_batch_status(
|
||||
batch_id=batch_id,
|
||||
poll_interval=poll_interval,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
return processor.retrieve_batch_results(batch_id)
|
||||
|
||||
{indent}'''
|
||||
|
||||
# Insert the methods
|
||||
new_content = content[:insert_pos] + methods + content[insert_pos:]
|
||||
|
||||
# Write back to file
|
||||
with open(file_path, 'w') as f:
|
||||
f.write(new_content)
|
||||
|
||||
print('Successfully added batch_create and batch_retrieve methods to BaseChatOpenAI class')
|
||||
return True
|
||||
|
||||
if __name__ == "__main__":
|
||||
add_batch_methods()
|
||||
@@ -1,140 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Script to add batch() method override to BaseChatOpenAI class."""
|
||||
|
||||
import re
|
||||
|
||||
def add_batch_override():
|
||||
file_path = '/home/daytona/langchain/libs/partners/openai/langchain_openai/chat_models/base.py'
|
||||
|
||||
# Read the base.py file
|
||||
with open(file_path, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
# Find the location to insert the method (after batch_retrieve method)
|
||||
pattern = r'(\s+)def _get_generation_chunk_from_completion\('
|
||||
match = re.search(pattern, content)
|
||||
|
||||
if not match:
|
||||
print("Could not find insertion point")
|
||||
return False
|
||||
|
||||
indent = match.group(1)
|
||||
insert_pos = match.start()
|
||||
|
||||
# Define the method to insert
|
||||
method = f'''
|
||||
@override
|
||||
def batch(
|
||||
self,
|
||||
inputs: List[LanguageModelInput],
|
||||
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
|
||||
*,
|
||||
return_exceptions: bool = False,
|
||||
use_batch_api: bool = False,
|
||||
**kwargs: Any,
|
||||
) -> List[BaseMessage]:
|
||||
"""
|
||||
Batch process multiple inputs using either standard API or OpenAI Batch API.
|
||||
|
||||
This method provides two processing modes:
|
||||
1. Standard mode (use_batch_api=False): Uses parallel invoke for immediate results
|
||||
2. Batch API mode (use_batch_api=True): Uses OpenAI's Batch API for 50% cost savings
|
||||
|
||||
Args:
|
||||
inputs: List of inputs to process in batch.
|
||||
config: Configuration for the batch processing.
|
||||
return_exceptions: Whether to return exceptions instead of raising them.
|
||||
use_batch_api: If True, use OpenAI's Batch API for cost savings with polling.
|
||||
If False (default), use standard parallel processing for immediate results.
|
||||
**kwargs: Additional parameters to pass to the underlying model.
|
||||
|
||||
Returns:
|
||||
List of BaseMessage objects corresponding to the inputs.
|
||||
|
||||
Raises:
|
||||
BatchError: If batch processing fails (when use_batch_api=True).
|
||||
|
||||
Note:
|
||||
**Cost vs Latency Tradeoff:**
|
||||
- use_batch_api=False: Immediate results, standard API pricing
|
||||
- use_batch_api=True: 50% cost savings, asynchronous processing with polling
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
from langchain_openai import ChatOpenAI
|
||||
from langchain_core.messages import HumanMessage
|
||||
|
||||
llm = ChatOpenAI()
|
||||
inputs = [
|
||||
[HumanMessage(content="What is 2+2?")],
|
||||
[HumanMessage(content="What is the capital of France?")],
|
||||
]
|
||||
|
||||
# Standard processing (immediate results)
|
||||
results = llm.batch(inputs)
|
||||
|
||||
# Batch API processing (50% cost savings, polling required)
|
||||
results = llm.batch(inputs, use_batch_api=True)
|
||||
"""
|
||||
if use_batch_api:
|
||||
# Convert inputs to messages_list format expected by batch_create
|
||||
messages_list = []
|
||||
for input_item in inputs:
|
||||
if isinstance(input_item, list):
|
||||
# Already a list of messages
|
||||
messages_list.append(input_item)
|
||||
else:
|
||||
# Convert single input to list of messages
|
||||
messages = self._convert_input_to_messages(input_item)
|
||||
messages_list.append(messages)
|
||||
|
||||
# Create batch job and poll for results
|
||||
batch_id = self.batch_create(messages_list, **kwargs)
|
||||
chat_results = self.batch_retrieve(batch_id)
|
||||
|
||||
# Convert ChatResult objects to BaseMessage objects
|
||||
return [result.generations[0].message for result in chat_results]
|
||||
else:
|
||||
# Use the parent class's standard batch implementation
|
||||
return super().batch(
|
||||
inputs=inputs,
|
||||
config=config,
|
||||
return_exceptions=return_exceptions,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def _convert_input_to_messages(self, input_item: LanguageModelInput) -> List[BaseMessage]:
|
||||
"""Convert various input formats to a list of BaseMessage objects."""
|
||||
if isinstance(input_item, list):
|
||||
# Already a list of messages
|
||||
return input_item
|
||||
elif isinstance(input_item, BaseMessage):
|
||||
# Single message
|
||||
return [input_item]
|
||||
elif isinstance(input_item, str):
|
||||
# String input - convert to HumanMessage
|
||||
from langchain_core.messages import HumanMessage
|
||||
return [HumanMessage(content=input_item)]
|
||||
elif hasattr(input_item, 'to_messages'):
|
||||
# PromptValue or similar
|
||||
return input_item.to_messages()
|
||||
else:
|
||||
# Try to convert to string and then to HumanMessage
|
||||
from langchain_core.messages import HumanMessage
|
||||
return [HumanMessage(content=str(input_item))]
|
||||
|
||||
{indent}'''
|
||||
|
||||
# Insert the method
|
||||
new_content = content[:insert_pos] + method + content[insert_pos:]
|
||||
|
||||
# Write back to file
|
||||
with open(file_path, 'w') as f:
|
||||
f.write(new_content)
|
||||
|
||||
print('Successfully added batch() method override to BaseChatOpenAI class')
|
||||
return True
|
||||
|
||||
if __name__ == "__main__":
|
||||
add_batch_override()
|
||||
Reference in New Issue
Block a user