diff --git a/add_batch_docstring.py b/add_batch_docstring.py deleted file mode 100644 index 465670f7cd8..00000000000 --- a/add_batch_docstring.py +++ /dev/null @@ -1,161 +0,0 @@ -#!/usr/bin/env python3 -"""Script to add batch API documentation to ChatOpenAI class docstring.""" - -import re - -def add_batch_docstring(): - file_path = '/home/daytona/langchain/libs/partners/openai/langchain_openai/chat_models/base.py' - - # Read the base.py file - with open(file_path, 'r') as f: - content = f.read() - - # Find the location to insert the batch API documentation (before the closing docstring) - pattern = r'(\s+)""" # noqa: E501' - match = re.search(pattern, content) - - if not match: - print("Could not find insertion point") - return False - - indent = match.group(1) - insert_pos = match.start() - - # Define the batch API documentation to insert - batch_docs = f''' - .. dropdown:: Batch API for cost savings - - .. versionadded:: 0.3.7 - - OpenAI's Batch API provides **50% cost savings** for non-real-time workloads by - processing requests asynchronously. This is ideal for tasks like data processing, - content generation, or evaluation that don't require immediate responses. - - **Cost vs Latency Tradeoff:** - - - **Standard API**: Immediate results, full pricing - - **Batch API**: 50% cost savings, asynchronous processing (results available within 24 hours) - - **Method 1: Direct batch management** - - Use ``batch_create()`` and ``batch_retrieve()`` for full control over batch lifecycle: - - .. code-block:: python - - from langchain_openai import ChatOpenAI - from langchain_core.messages import HumanMessage - - llm = ChatOpenAI(model="gpt-3.5-turbo") - - # Prepare multiple message sequences for batch processing - messages_list = [ - [HumanMessage(content="Translate 'hello' to French")], - [HumanMessage(content="Translate 'goodbye' to Spanish")], - [HumanMessage(content="What is the capital of Italy?")], - ] - - # Create batch job (returns immediately with batch ID) - batch_id = llm.batch_create( - messages_list=messages_list, - description="Translation and geography batch", - metadata={{"project": "multilingual_qa", "user": "analyst_1"}}, - ) - print(f"Batch created: {{batch_id}}") - - # Later, retrieve results (polls until completion) - results = llm.batch_retrieve( - batch_id=batch_id, - poll_interval=60.0, # Check every minute - timeout=3600.0, # 1 hour timeout - ) - - # Process results - for i, result in enumerate(results): - response = result.generations[0].message.content - print(f"Response {{i+1}}: {{response}}") - - **Method 2: Enhanced batch() method** - - Use the familiar ``batch()`` method with ``use_batch_api=True`` for seamless integration: - - .. code-block:: python - - # Standard batch processing (immediate, full cost) - inputs = [ - [HumanMessage(content="What is 2+2?")], - [HumanMessage(content="What is 3+3?")], - ] - standard_results = llm.batch(inputs) # Default: use_batch_api=False - - # Batch API processing (50% cost savings, polling) - batch_results = llm.batch( - inputs, - use_batch_api=True, # Enable cost savings - poll_interval=30.0, # Poll every 30 seconds - timeout=1800.0, # 30 minute timeout - ) - - **Batch creation with custom parameters:** - - .. code-block:: python - - # Create batch with specific model parameters - batch_id = llm.batch_create( - messages_list=messages_list, - description="Creative writing batch", - metadata={{"task_type": "content_generation"}}, - temperature=0.8, # Higher creativity - max_tokens=200, # Longer responses - top_p=0.9, # Nucleus sampling - ) - - **Error handling and monitoring:** - - .. code-block:: python - - from langchain_openai.chat_models.batch import BatchError - - try: - batch_id = llm.batch_create(messages_list) - results = llm.batch_retrieve(batch_id, timeout=600.0) - except BatchError as e: - print(f"Batch processing failed: {{e}}") - # Handle batch failure (retry, fallback to standard API, etc.) - - **Best practices:** - - - Use batch API for **non-urgent tasks** where 50% cost savings justify longer wait times - - Set appropriate **timeouts** based on batch size (larger batches take longer) - - Include **descriptive metadata** for tracking and debugging batch jobs - - Consider **fallback strategies** for time-sensitive applications - - Monitor batch status for **long-running jobs** to detect failures early - - **When to use Batch API:** - - ✅ **Good for:** - - Data processing and analysis - - Content generation at scale - - Model evaluation and testing - - Batch translation or summarization - - Non-interactive applications - - ❌ **Not suitable for:** - - Real-time chat applications - - Interactive user interfaces - - Time-critical decision making - - Applications requiring immediate responses - -{indent}''' - - # Insert the batch API documentation - new_content = content[:insert_pos] + batch_docs + content[insert_pos:] - - # Write back to file - with open(file_path, 'w') as f: - f.write(new_content) - - print('Successfully added batch API documentation to ChatOpenAI class docstring') - return True - -if __name__ == "__main__": - add_batch_docstring() diff --git a/add_batch_methods.py b/add_batch_methods.py deleted file mode 100644 index 8d9b3bec54e..00000000000 --- a/add_batch_methods.py +++ /dev/null @@ -1,166 +0,0 @@ -#!/usr/bin/env python3 -"""Script to add batch_create and batch_retrieve methods to BaseChatOpenAI class.""" - -import re - -def add_batch_methods(): - file_path = '/home/daytona/langchain/libs/partners/openai/langchain_openai/chat_models/base.py' - - # Read the base.py file - with open(file_path, 'r') as f: - content = f.read() - - # Find the location to insert the methods (before _get_generation_chunk_from_completion) - pattern = r'(\s+)def _get_generation_chunk_from_completion\(' - match = re.search(pattern, content) - - if not match: - print("Could not find insertion point") - return False - - indent = match.group(1) - insert_pos = match.start() - - # Define the methods to insert - methods = f''' - def batch_create( - self, - messages_list: List[List[BaseMessage]], - *, - description: Optional[str] = None, - metadata: Optional[Dict[str, str]] = None, - poll_interval: float = 10.0, - timeout: Optional[float] = None, - **kwargs: Any, - ) -> str: - """ - Create a batch job using OpenAI's Batch API for asynchronous processing. - - This method provides 50% cost savings compared to the standard API in exchange - for asynchronous processing with polling for results. - - Args: - messages_list: List of message sequences to process in batch. - description: Optional description for the batch job. - metadata: Optional metadata to attach to the batch job. - poll_interval: Default time in seconds between status checks when polling. - timeout: Default maximum time in seconds to wait for completion. - **kwargs: Additional parameters to pass to chat completions. - - Returns: - The batch ID for tracking the asynchronous job. - - Raises: - BatchError: If batch creation fails. - - Example: - .. code-block:: python - - from langchain_openai import ChatOpenAI - from langchain_core.messages import HumanMessage - - llm = ChatOpenAI() - messages_list = [ - [HumanMessage(content="What is 2+2?")], - [HumanMessage(content="What is the capital of France?")], - ] - - # Create batch job (50% cost savings) - batch_id = llm.batch_create(messages_list) - - # Later, retrieve results - results = llm.batch_retrieve(batch_id) - """ - # Import here to avoid circular imports - from langchain_openai.chat_models.batch import OpenAIBatchProcessor - - # Create batch processor with current model settings - processor = OpenAIBatchProcessor( - client=self.root_client, - model=self.model_name, - poll_interval=poll_interval, - timeout=timeout, - ) - - # Filter and prepare kwargs for batch processing - batch_kwargs = self._get_invocation_params(**kwargs) - # Remove model from kwargs since it's handled by the processor - batch_kwargs.pop("model", None) - - return processor.create_batch( - messages_list=messages_list, - description=description, - metadata=metadata, - **batch_kwargs, - ) - - def batch_retrieve( - self, - batch_id: str, - *, - poll_interval: Optional[float] = None, - timeout: Optional[float] = None, - ) -> List[ChatResult]: - """ - Retrieve results from a batch job, polling until completion if necessary. - - This method will poll the batch status until completion and return the results - converted to LangChain ChatResult format. - - Args: - batch_id: The batch ID returned from batch_create(). - poll_interval: Time in seconds between status checks. Uses default if None. - timeout: Maximum time in seconds to wait. Uses default if None. - - Returns: - List of ChatResult objects corresponding to the original message sequences. - - Raises: - BatchError: If batch retrieval fails, times out, or batch job failed. - - Example: - .. code-block:: python - - # After creating a batch job - batch_id = llm.batch_create(messages_list) - - # Retrieve results (will poll until completion) - results = llm.batch_retrieve(batch_id) - - for result in results: - print(result.generations[0].message.content) - """ - # Import here to avoid circular imports - from langchain_openai.chat_models.batch import OpenAIBatchProcessor - - # Create batch processor with current model settings - processor = OpenAIBatchProcessor( - client=self.root_client, - model=self.model_name, - poll_interval=poll_interval or 10.0, - timeout=timeout, - ) - - # Poll for completion and retrieve results - processor.poll_batch_status( - batch_id=batch_id, - poll_interval=poll_interval, - timeout=timeout, - ) - - return processor.retrieve_batch_results(batch_id) - -{indent}''' - - # Insert the methods - new_content = content[:insert_pos] + methods + content[insert_pos:] - - # Write back to file - with open(file_path, 'w') as f: - f.write(new_content) - - print('Successfully added batch_create and batch_retrieve methods to BaseChatOpenAI class') - return True - -if __name__ == "__main__": - add_batch_methods() diff --git a/add_batch_override.py b/add_batch_override.py deleted file mode 100644 index 8d9d2d11ac8..00000000000 --- a/add_batch_override.py +++ /dev/null @@ -1,140 +0,0 @@ -#!/usr/bin/env python3 -"""Script to add batch() method override to BaseChatOpenAI class.""" - -import re - -def add_batch_override(): - file_path = '/home/daytona/langchain/libs/partners/openai/langchain_openai/chat_models/base.py' - - # Read the base.py file - with open(file_path, 'r') as f: - content = f.read() - - # Find the location to insert the method (after batch_retrieve method) - pattern = r'(\s+)def _get_generation_chunk_from_completion\(' - match = re.search(pattern, content) - - if not match: - print("Could not find insertion point") - return False - - indent = match.group(1) - insert_pos = match.start() - - # Define the method to insert - method = f''' - @override - def batch( - self, - inputs: List[LanguageModelInput], - config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, - *, - return_exceptions: bool = False, - use_batch_api: bool = False, - **kwargs: Any, - ) -> List[BaseMessage]: - """ - Batch process multiple inputs using either standard API or OpenAI Batch API. - - This method provides two processing modes: - 1. Standard mode (use_batch_api=False): Uses parallel invoke for immediate results - 2. Batch API mode (use_batch_api=True): Uses OpenAI's Batch API for 50% cost savings - - Args: - inputs: List of inputs to process in batch. - config: Configuration for the batch processing. - return_exceptions: Whether to return exceptions instead of raising them. - use_batch_api: If True, use OpenAI's Batch API for cost savings with polling. - If False (default), use standard parallel processing for immediate results. - **kwargs: Additional parameters to pass to the underlying model. - - Returns: - List of BaseMessage objects corresponding to the inputs. - - Raises: - BatchError: If batch processing fails (when use_batch_api=True). - - Note: - **Cost vs Latency Tradeoff:** - - use_batch_api=False: Immediate results, standard API pricing - - use_batch_api=True: 50% cost savings, asynchronous processing with polling - - Example: - .. code-block:: python - - from langchain_openai import ChatOpenAI - from langchain_core.messages import HumanMessage - - llm = ChatOpenAI() - inputs = [ - [HumanMessage(content="What is 2+2?")], - [HumanMessage(content="What is the capital of France?")], - ] - - # Standard processing (immediate results) - results = llm.batch(inputs) - - # Batch API processing (50% cost savings, polling required) - results = llm.batch(inputs, use_batch_api=True) - """ - if use_batch_api: - # Convert inputs to messages_list format expected by batch_create - messages_list = [] - for input_item in inputs: - if isinstance(input_item, list): - # Already a list of messages - messages_list.append(input_item) - else: - # Convert single input to list of messages - messages = self._convert_input_to_messages(input_item) - messages_list.append(messages) - - # Create batch job and poll for results - batch_id = self.batch_create(messages_list, **kwargs) - chat_results = self.batch_retrieve(batch_id) - - # Convert ChatResult objects to BaseMessage objects - return [result.generations[0].message for result in chat_results] - else: - # Use the parent class's standard batch implementation - return super().batch( - inputs=inputs, - config=config, - return_exceptions=return_exceptions, - **kwargs, - ) - - def _convert_input_to_messages(self, input_item: LanguageModelInput) -> List[BaseMessage]: - """Convert various input formats to a list of BaseMessage objects.""" - if isinstance(input_item, list): - # Already a list of messages - return input_item - elif isinstance(input_item, BaseMessage): - # Single message - return [input_item] - elif isinstance(input_item, str): - # String input - convert to HumanMessage - from langchain_core.messages import HumanMessage - return [HumanMessage(content=input_item)] - elif hasattr(input_item, 'to_messages'): - # PromptValue or similar - return input_item.to_messages() - else: - # Try to convert to string and then to HumanMessage - from langchain_core.messages import HumanMessage - return [HumanMessage(content=str(input_item))] - -{indent}''' - - # Insert the method - new_content = content[:insert_pos] + method + content[insert_pos:] - - # Write back to file - with open(file_path, 'w') as f: - f.write(new_content) - - print('Successfully added batch() method override to BaseChatOpenAI class') - return True - -if __name__ == "__main__": - add_batch_override()