mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-03 03:59:42 +00:00
chore(langchain): remove untested chains for first alpha (#32710)
Also removing globals.py file
This commit is contained in:
@@ -1,9 +0,0 @@
|
|||||||
from langchain.chains.documents import (
|
|
||||||
create_map_reduce_chain,
|
|
||||||
create_stuff_documents_chain,
|
|
||||||
)
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
"create_map_reduce_chain",
|
|
||||||
"create_stuff_documents_chain",
|
|
||||||
]
|
|
@@ -1,17 +0,0 @@
|
|||||||
"""Document extraction chains.
|
|
||||||
|
|
||||||
This module provides different strategies for extracting information from collections
|
|
||||||
of documents using LangGraph and modern language models.
|
|
||||||
|
|
||||||
Available Strategies:
|
|
||||||
- Stuff: Processes all documents together in a single context window
|
|
||||||
- Map-Reduce: Processes documents in parallel (map), then combines results (reduce)
|
|
||||||
"""
|
|
||||||
|
|
||||||
from langchain.chains.documents.map_reduce import create_map_reduce_chain
|
|
||||||
from langchain.chains.documents.stuff import create_stuff_documents_chain
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
"create_map_reduce_chain",
|
|
||||||
"create_stuff_documents_chain",
|
|
||||||
]
|
|
@@ -1,580 +0,0 @@
|
|||||||
"""Map-Reduce Extraction Implementation using LangGraph Send API."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import operator
|
|
||||||
from typing import (
|
|
||||||
TYPE_CHECKING,
|
|
||||||
Annotated,
|
|
||||||
Any,
|
|
||||||
Generic,
|
|
||||||
Literal,
|
|
||||||
Union,
|
|
||||||
cast,
|
|
||||||
)
|
|
||||||
|
|
||||||
from langgraph.graph import END, START, StateGraph
|
|
||||||
from langgraph.types import Send
|
|
||||||
from typing_extensions import NotRequired, TypedDict
|
|
||||||
|
|
||||||
from langchain._internal._documents import format_document_xml
|
|
||||||
from langchain._internal._prompts import aresolve_prompt, resolve_prompt
|
|
||||||
from langchain._internal._typing import ContextT, StateNode
|
|
||||||
from langchain._internal._utils import RunnableCallable
|
|
||||||
from langchain.chat_models import init_chat_model
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from collections.abc import Callable
|
|
||||||
|
|
||||||
from langchain_core.documents import Document
|
|
||||||
from langchain_core.language_models.chat_models import BaseChatModel
|
|
||||||
|
|
||||||
# Pycharm is unable to identify that AIMessage is used in the cast below
|
|
||||||
from langchain_core.messages import (
|
|
||||||
AIMessage,
|
|
||||||
MessageLikeRepresentation,
|
|
||||||
)
|
|
||||||
from langchain_core.runnables import RunnableConfig
|
|
||||||
from langgraph.runtime import Runtime
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
|
|
||||||
class ExtractionResult(TypedDict):
|
|
||||||
"""Result from processing a document or group of documents."""
|
|
||||||
|
|
||||||
indexes: list[int]
|
|
||||||
"""Document indexes that contributed to this result."""
|
|
||||||
result: Any
|
|
||||||
"""Extracted result from the document(s)."""
|
|
||||||
|
|
||||||
|
|
||||||
class MapReduceState(TypedDict):
|
|
||||||
"""State for map-reduce extraction chain.
|
|
||||||
|
|
||||||
This state tracks the map-reduce process where documents are processed
|
|
||||||
in parallel during the map phase, then combined in the reduce phase.
|
|
||||||
"""
|
|
||||||
|
|
||||||
documents: list[Document]
|
|
||||||
"""List of documents to process."""
|
|
||||||
map_results: Annotated[list[ExtractionResult], operator.add]
|
|
||||||
"""Individual results from the map phase."""
|
|
||||||
result: NotRequired[Any]
|
|
||||||
"""Final combined result from the reduce phase if applicable."""
|
|
||||||
|
|
||||||
|
|
||||||
# The payload for the map phase is a list of documents and their indexes.
|
|
||||||
# The current implementation only supports a single document per map operation,
|
|
||||||
# but the structure allows for future expansion to process a group of documents.
|
|
||||||
# A user would provide an input split function that returns groups of documents
|
|
||||||
# to process together, if desired.
|
|
||||||
class MapState(TypedDict):
|
|
||||||
"""State for individual map operations."""
|
|
||||||
|
|
||||||
documents: list[Document]
|
|
||||||
"""List of documents to process in map phase."""
|
|
||||||
indexes: list[int]
|
|
||||||
"""List of indexes of the documents in the original list."""
|
|
||||||
|
|
||||||
|
|
||||||
class InputSchema(TypedDict):
|
|
||||||
"""Input schema for the map-reduce extraction chain.
|
|
||||||
|
|
||||||
Defines the expected input format when invoking the extraction chain.
|
|
||||||
"""
|
|
||||||
|
|
||||||
documents: list[Document]
|
|
||||||
"""List of documents to process."""
|
|
||||||
|
|
||||||
|
|
||||||
class OutputSchema(TypedDict):
|
|
||||||
"""Output schema for the map-reduce extraction chain.
|
|
||||||
|
|
||||||
Defines the format of the final result returned by the chain.
|
|
||||||
"""
|
|
||||||
|
|
||||||
map_results: list[ExtractionResult]
|
|
||||||
"""List of individual extraction results from the map phase."""
|
|
||||||
|
|
||||||
result: Any
|
|
||||||
"""Final combined result from all documents."""
|
|
||||||
|
|
||||||
|
|
||||||
class MapReduceNodeUpdate(TypedDict):
|
|
||||||
"""Update returned by map-reduce nodes."""
|
|
||||||
|
|
||||||
map_results: NotRequired[list[ExtractionResult]]
|
|
||||||
"""Updated results after map phase."""
|
|
||||||
result: NotRequired[Any]
|
|
||||||
"""Final result after reduce phase."""
|
|
||||||
|
|
||||||
|
|
||||||
class _MapReduceExtractor(Generic[ContextT]):
|
|
||||||
"""Map-reduce extraction implementation using LangGraph Send API.
|
|
||||||
|
|
||||||
This implementation uses a language model to process documents through up
|
|
||||||
to two phases:
|
|
||||||
|
|
||||||
1. **Map Phase**: Each document is processed independently by the LLM using
|
|
||||||
the configured map_prompt to generate individual extraction results.
|
|
||||||
2. **Reduce Phase (Optional)**: Individual results can optionally be
|
|
||||||
combined using either:
|
|
||||||
- The default LLM-based reducer with the configured reduce_prompt
|
|
||||||
- A custom reducer function (which can be non-LLM based)
|
|
||||||
- Skipped entirely by setting reduce=None
|
|
||||||
|
|
||||||
The map phase processes documents in parallel for efficiency, making this approach
|
|
||||||
well-suited for large document collections. The reduce phase is flexible and can be
|
|
||||||
customized or omitted based on your specific requirements.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
model: Union[BaseChatModel, str],
|
|
||||||
*,
|
|
||||||
map_prompt: Union[
|
|
||||||
str,
|
|
||||||
None,
|
|
||||||
Callable[
|
|
||||||
[MapState, Runtime[ContextT]],
|
|
||||||
list[MessageLikeRepresentation],
|
|
||||||
],
|
|
||||||
] = None,
|
|
||||||
reduce_prompt: Union[
|
|
||||||
str,
|
|
||||||
None,
|
|
||||||
Callable[
|
|
||||||
[MapReduceState, Runtime[ContextT]],
|
|
||||||
list[MessageLikeRepresentation],
|
|
||||||
],
|
|
||||||
] = None,
|
|
||||||
reduce: Union[
|
|
||||||
Literal["default_reducer"],
|
|
||||||
None,
|
|
||||||
StateNode,
|
|
||||||
] = "default_reducer",
|
|
||||||
context_schema: type[ContextT] | None = None,
|
|
||||||
response_format: type[BaseModel] | None = None,
|
|
||||||
) -> None:
|
|
||||||
"""Initialize the MapReduceExtractor.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
model: The language model either a chat model instance
|
|
||||||
(e.g., `ChatAnthropic()`) or string identifier
|
|
||||||
(e.g., `"anthropic:claude-sonnet-4-20250514"`)
|
|
||||||
map_prompt: Prompt for individual document processing. Can be:
|
|
||||||
- str: A system message string
|
|
||||||
- None: Use default system message
|
|
||||||
- Callable: A function that takes (state, runtime) and returns messages
|
|
||||||
reduce_prompt: Prompt for combining results. Can be:
|
|
||||||
- str: A system message string
|
|
||||||
- None: Use default system message
|
|
||||||
- Callable: A function that takes (state, runtime) and returns messages
|
|
||||||
reduce: Controls the reduce behavior. Can be:
|
|
||||||
- "default_reducer": Use the default LLM-based reduce step
|
|
||||||
- None: Skip the reduce step entirely
|
|
||||||
- Callable: Custom reduce function (sync or async)
|
|
||||||
context_schema: Optional context schema for the LangGraph runtime.
|
|
||||||
response_format: Optional pydantic BaseModel for structured output.
|
|
||||||
"""
|
|
||||||
if (reduce is None or callable(reduce)) and reduce_prompt is not None:
|
|
||||||
msg = (
|
|
||||||
"reduce_prompt must be None when reduce is None or a custom "
|
|
||||||
"callable. Custom reduce functions handle their own logic and "
|
|
||||||
"should not use reduce_prompt."
|
|
||||||
)
|
|
||||||
raise ValueError(msg)
|
|
||||||
|
|
||||||
self.response_format = response_format
|
|
||||||
|
|
||||||
if isinstance(model, str):
|
|
||||||
model = init_chat_model(model)
|
|
||||||
|
|
||||||
self.model = model.with_structured_output(response_format) if response_format else model
|
|
||||||
self.map_prompt = map_prompt
|
|
||||||
self.reduce_prompt = reduce_prompt
|
|
||||||
self.reduce = reduce
|
|
||||||
self.context_schema = context_schema
|
|
||||||
|
|
||||||
def _get_map_prompt(
|
|
||||||
self, state: MapState, runtime: Runtime[ContextT]
|
|
||||||
) -> list[MessageLikeRepresentation]:
|
|
||||||
"""Generate the LLM prompt for processing documents."""
|
|
||||||
documents = state["documents"]
|
|
||||||
user_content = "\n\n".join(format_document_xml(doc) for doc in documents)
|
|
||||||
default_system = (
|
|
||||||
"You are a helpful assistant that processes documents. "
|
|
||||||
"Please process the following documents and provide a result."
|
|
||||||
)
|
|
||||||
|
|
||||||
return resolve_prompt(
|
|
||||||
self.map_prompt,
|
|
||||||
state,
|
|
||||||
runtime,
|
|
||||||
user_content,
|
|
||||||
default_system,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _aget_map_prompt(
|
|
||||||
self, state: MapState, runtime: Runtime[ContextT]
|
|
||||||
) -> list[MessageLikeRepresentation]:
|
|
||||||
"""Generate the LLM prompt for processing documents in the map phase.
|
|
||||||
|
|
||||||
Async version.
|
|
||||||
"""
|
|
||||||
documents = state["documents"]
|
|
||||||
user_content = "\n\n".join(format_document_xml(doc) for doc in documents)
|
|
||||||
default_system = (
|
|
||||||
"You are a helpful assistant that processes documents. "
|
|
||||||
"Please process the following documents and provide a result."
|
|
||||||
)
|
|
||||||
|
|
||||||
return await aresolve_prompt(
|
|
||||||
self.map_prompt,
|
|
||||||
state,
|
|
||||||
runtime,
|
|
||||||
user_content,
|
|
||||||
default_system,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _get_reduce_prompt(
|
|
||||||
self, state: MapReduceState, runtime: Runtime[ContextT]
|
|
||||||
) -> list[MessageLikeRepresentation]:
|
|
||||||
"""Generate the LLM prompt for combining individual results.
|
|
||||||
|
|
||||||
Combines map results in the reduce phase.
|
|
||||||
"""
|
|
||||||
map_results = state.get("map_results", [])
|
|
||||||
if not map_results:
|
|
||||||
msg = (
|
|
||||||
"Internal programming error: Results must exist when reducing. "
|
|
||||||
"This indicates that the reduce node was reached without "
|
|
||||||
"first processing the map nodes, which violates "
|
|
||||||
"the expected graph execution order."
|
|
||||||
)
|
|
||||||
raise AssertionError(msg)
|
|
||||||
|
|
||||||
results_text = "\n\n".join(
|
|
||||||
f"Result {i + 1} (from documents "
|
|
||||||
f"{', '.join(map(str, result['indexes']))}):\n{result['result']}"
|
|
||||||
for i, result in enumerate(map_results)
|
|
||||||
)
|
|
||||||
user_content = (
|
|
||||||
f"Please combine the following results into a single, "
|
|
||||||
f"comprehensive result:\n\n{results_text}"
|
|
||||||
)
|
|
||||||
default_system = (
|
|
||||||
"You are a helpful assistant that combines multiple results. "
|
|
||||||
"Given several individual results, create a single comprehensive "
|
|
||||||
"result that captures the key information from all inputs while "
|
|
||||||
"maintaining conciseness and coherence."
|
|
||||||
)
|
|
||||||
|
|
||||||
return resolve_prompt(
|
|
||||||
self.reduce_prompt,
|
|
||||||
state,
|
|
||||||
runtime,
|
|
||||||
user_content,
|
|
||||||
default_system,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _aget_reduce_prompt(
|
|
||||||
self, state: MapReduceState, runtime: Runtime[ContextT]
|
|
||||||
) -> list[MessageLikeRepresentation]:
|
|
||||||
"""Generate the LLM prompt for combining individual results.
|
|
||||||
|
|
||||||
Async version of reduce phase.
|
|
||||||
"""
|
|
||||||
map_results = state.get("map_results", [])
|
|
||||||
if not map_results:
|
|
||||||
msg = (
|
|
||||||
"Internal programming error: Results must exist when reducing. "
|
|
||||||
"This indicates that the reduce node was reached without "
|
|
||||||
"first processing the map nodes, which violates "
|
|
||||||
"the expected graph execution order."
|
|
||||||
)
|
|
||||||
raise AssertionError(msg)
|
|
||||||
|
|
||||||
results_text = "\n\n".join(
|
|
||||||
f"Result {i + 1} (from documents "
|
|
||||||
f"{', '.join(map(str, result['indexes']))}):\n{result['result']}"
|
|
||||||
for i, result in enumerate(map_results)
|
|
||||||
)
|
|
||||||
user_content = (
|
|
||||||
f"Please combine the following results into a single, "
|
|
||||||
f"comprehensive result:\n\n{results_text}"
|
|
||||||
)
|
|
||||||
default_system = (
|
|
||||||
"You are a helpful assistant that combines multiple results. "
|
|
||||||
"Given several individual results, create a single comprehensive "
|
|
||||||
"result that captures the key information from all inputs while "
|
|
||||||
"maintaining conciseness and coherence."
|
|
||||||
)
|
|
||||||
|
|
||||||
return await aresolve_prompt(
|
|
||||||
self.reduce_prompt,
|
|
||||||
state,
|
|
||||||
runtime,
|
|
||||||
user_content,
|
|
||||||
default_system,
|
|
||||||
)
|
|
||||||
|
|
||||||
def create_map_node(self) -> RunnableCallable:
|
|
||||||
"""Create a LangGraph node that processes individual documents using the LLM."""
|
|
||||||
|
|
||||||
def _map_node(
|
|
||||||
state: MapState, runtime: Runtime[ContextT], config: RunnableConfig
|
|
||||||
) -> dict[str, list[ExtractionResult]]:
|
|
||||||
prompt = self._get_map_prompt(state, runtime)
|
|
||||||
response = cast("AIMessage", self.model.invoke(prompt, config=config))
|
|
||||||
result = response if self.response_format else response.text()
|
|
||||||
extraction_result: ExtractionResult = {
|
|
||||||
"indexes": state["indexes"],
|
|
||||||
"result": result,
|
|
||||||
}
|
|
||||||
return {"map_results": [extraction_result]}
|
|
||||||
|
|
||||||
async def _amap_node(
|
|
||||||
state: MapState,
|
|
||||||
runtime: Runtime[ContextT],
|
|
||||||
config: RunnableConfig,
|
|
||||||
) -> dict[str, list[ExtractionResult]]:
|
|
||||||
prompt = await self._aget_map_prompt(state, runtime)
|
|
||||||
response = cast("AIMessage", await self.model.ainvoke(prompt, config=config))
|
|
||||||
result = response if self.response_format else response.text()
|
|
||||||
extraction_result: ExtractionResult = {
|
|
||||||
"indexes": state["indexes"],
|
|
||||||
"result": result,
|
|
||||||
}
|
|
||||||
return {"map_results": [extraction_result]}
|
|
||||||
|
|
||||||
return RunnableCallable(
|
|
||||||
_map_node,
|
|
||||||
_amap_node,
|
|
||||||
trace=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
def create_reduce_node(self) -> RunnableCallable:
|
|
||||||
"""Create a LangGraph node that combines individual results using the LLM."""
|
|
||||||
|
|
||||||
def _reduce_node(
|
|
||||||
state: MapReduceState, runtime: Runtime[ContextT], config: RunnableConfig
|
|
||||||
) -> MapReduceNodeUpdate:
|
|
||||||
prompt = self._get_reduce_prompt(state, runtime)
|
|
||||||
response = cast("AIMessage", self.model.invoke(prompt, config=config))
|
|
||||||
result = response if self.response_format else response.text()
|
|
||||||
return {"result": result}
|
|
||||||
|
|
||||||
async def _areduce_node(
|
|
||||||
state: MapReduceState,
|
|
||||||
runtime: Runtime[ContextT],
|
|
||||||
config: RunnableConfig,
|
|
||||||
) -> MapReduceNodeUpdate:
|
|
||||||
prompt = await self._aget_reduce_prompt(state, runtime)
|
|
||||||
response = cast("AIMessage", await self.model.ainvoke(prompt, config=config))
|
|
||||||
result = response if self.response_format else response.text()
|
|
||||||
return {"result": result}
|
|
||||||
|
|
||||||
return RunnableCallable(
|
|
||||||
_reduce_node,
|
|
||||||
_areduce_node,
|
|
||||||
trace=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
def continue_to_map(self, state: MapReduceState) -> list[Send]:
|
|
||||||
"""Generate Send objects for parallel map operations."""
|
|
||||||
return [
|
|
||||||
Send("map_process", {"documents": [doc], "indexes": [i]})
|
|
||||||
for i, doc in enumerate(state["documents"])
|
|
||||||
]
|
|
||||||
|
|
||||||
def build(
|
|
||||||
self,
|
|
||||||
) -> StateGraph[MapReduceState, ContextT, InputSchema, OutputSchema]:
|
|
||||||
"""Build and compile the LangGraph for map-reduce summarization."""
|
|
||||||
builder = StateGraph(
|
|
||||||
MapReduceState,
|
|
||||||
context_schema=self.context_schema,
|
|
||||||
input_schema=InputSchema,
|
|
||||||
output_schema=OutputSchema,
|
|
||||||
)
|
|
||||||
|
|
||||||
builder.add_node("map_process", self.create_map_node())
|
|
||||||
|
|
||||||
builder.add_edge(START, "continue_to_map")
|
|
||||||
# Add-conditional edges doesn't explicitly type Send
|
|
||||||
builder.add_conditional_edges(
|
|
||||||
"continue_to_map",
|
|
||||||
self.continue_to_map,
|
|
||||||
["map_process"],
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.reduce is None:
|
|
||||||
builder.add_edge("map_process", END)
|
|
||||||
elif self.reduce == "default_reducer":
|
|
||||||
builder.add_node("reduce_process", self.create_reduce_node())
|
|
||||||
builder.add_edge("map_process", "reduce_process")
|
|
||||||
builder.add_edge("reduce_process", END)
|
|
||||||
else:
|
|
||||||
reduce_node = self.reduce
|
|
||||||
# The type is ignored here. Requires parameterizing with generics.
|
|
||||||
builder.add_node("reduce_process", reduce_node) # type: ignore[arg-type]
|
|
||||||
builder.add_edge("map_process", "reduce_process")
|
|
||||||
builder.add_edge("reduce_process", END)
|
|
||||||
|
|
||||||
return builder
|
|
||||||
|
|
||||||
|
|
||||||
def create_map_reduce_chain(
|
|
||||||
model: Union[BaseChatModel, str],
|
|
||||||
*,
|
|
||||||
map_prompt: Union[
|
|
||||||
str,
|
|
||||||
None,
|
|
||||||
Callable[[MapState, Runtime[ContextT]], list[MessageLikeRepresentation]],
|
|
||||||
] = None,
|
|
||||||
reduce_prompt: Union[
|
|
||||||
str,
|
|
||||||
None,
|
|
||||||
Callable[[MapReduceState, Runtime[ContextT]], list[MessageLikeRepresentation]],
|
|
||||||
] = None,
|
|
||||||
reduce: Union[
|
|
||||||
Literal["default_reducer"],
|
|
||||||
None,
|
|
||||||
StateNode,
|
|
||||||
] = "default_reducer",
|
|
||||||
context_schema: type[ContextT] | None = None,
|
|
||||||
response_format: type[BaseModel] | None = None,
|
|
||||||
) -> StateGraph[MapReduceState, ContextT, InputSchema, OutputSchema]:
|
|
||||||
"""Create a map-reduce document extraction chain.
|
|
||||||
|
|
||||||
This implementation uses a language model to extract information from documents
|
|
||||||
through a flexible approach that efficiently handles large document collections
|
|
||||||
by processing documents in parallel.
|
|
||||||
|
|
||||||
**Processing Flow:**
|
|
||||||
1. **Map Phase**: Each document is independently processed by the LLM
|
|
||||||
using the map_prompt to extract relevant information and generate
|
|
||||||
individual results.
|
|
||||||
2. **Reduce Phase (Optional)**: Individual extraction results can
|
|
||||||
optionally be combined using:
|
|
||||||
- The default LLM-based reducer with reduce_prompt (default behavior)
|
|
||||||
- A custom reducer function (can be non-LLM based)
|
|
||||||
- Skipped entirely by setting reduce=None
|
|
||||||
3. **Output**: Returns the individual map results and optionally the final
|
|
||||||
combined result.
|
|
||||||
|
|
||||||
Example:
|
|
||||||
>>> from langchain_anthropic import ChatAnthropic
|
|
||||||
>>> from langchain_core.documents import Document
|
|
||||||
>>>
|
|
||||||
>>> model = ChatAnthropic(
|
|
||||||
... model="claude-sonnet-4-20250514",
|
|
||||||
... temperature=0,
|
|
||||||
... max_tokens=62_000,
|
|
||||||
... timeout=None,
|
|
||||||
... max_retries=2,
|
|
||||||
... )
|
|
||||||
>>> builder = create_map_reduce_chain(model)
|
|
||||||
>>> chain = builder.compile()
|
|
||||||
>>> docs = [
|
|
||||||
... Document(page_content="First document content..."),
|
|
||||||
... Document(page_content="Second document content..."),
|
|
||||||
... Document(page_content="Third document content..."),
|
|
||||||
... ]
|
|
||||||
>>> result = chain.invoke({"documents": docs})
|
|
||||||
>>> print(result["result"])
|
|
||||||
|
|
||||||
Example with string model:
|
|
||||||
>>> builder = create_map_reduce_chain("anthropic:claude-sonnet-4-20250514")
|
|
||||||
>>> chain = builder.compile()
|
|
||||||
>>> result = chain.invoke({"documents": docs})
|
|
||||||
>>> print(result["result"])
|
|
||||||
|
|
||||||
Example with structured output:
|
|
||||||
```python
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
class ExtractionModel(BaseModel):
|
|
||||||
title: str
|
|
||||||
key_points: list[str]
|
|
||||||
conclusion: str
|
|
||||||
|
|
||||||
builder = create_map_reduce_chain(
|
|
||||||
model,
|
|
||||||
response_format=ExtractionModel
|
|
||||||
)
|
|
||||||
chain = builder.compile()
|
|
||||||
result = chain.invoke({"documents": docs})
|
|
||||||
print(result["result"].title) # Access structured fields
|
|
||||||
```
|
|
||||||
|
|
||||||
Example skipping the reduce phase:
|
|
||||||
```python
|
|
||||||
# Only perform map phase, skip combining results
|
|
||||||
builder = create_map_reduce_chain(model, reduce=None)
|
|
||||||
chain = builder.compile()
|
|
||||||
result = chain.invoke({"documents": docs})
|
|
||||||
# result["result"] will be None, only map_results are available
|
|
||||||
for map_result in result["map_results"]:
|
|
||||||
print(f"Document {map_result['indexes'][0]}: {map_result['result']}")
|
|
||||||
```
|
|
||||||
|
|
||||||
Example with custom reducer:
|
|
||||||
```python
|
|
||||||
def custom_aggregator(state, runtime):
|
|
||||||
# Custom non-LLM based reduction logic
|
|
||||||
map_results = state["map_results"]
|
|
||||||
combined_text = " | ".join(r["result"] for r in map_results)
|
|
||||||
word_count = len(combined_text.split())
|
|
||||||
return {
|
|
||||||
"result": f"Combined {len(map_results)} results with "
|
|
||||||
f"{word_count} total words"
|
|
||||||
}
|
|
||||||
|
|
||||||
builder = create_map_reduce_chain(model, reduce=custom_aggregator)
|
|
||||||
chain = builder.compile()
|
|
||||||
result = chain.invoke({"documents": docs})
|
|
||||||
print(result["result"]) # Custom aggregated result
|
|
||||||
```
|
|
||||||
|
|
||||||
Args:
|
|
||||||
model: The language model either a chat model instance
|
|
||||||
(e.g., `ChatAnthropic()`) or string identifier
|
|
||||||
(e.g., `"anthropic:claude-sonnet-4-20250514"`)
|
|
||||||
map_prompt: Prompt for individual document processing. Can be:
|
|
||||||
- str: A system message string
|
|
||||||
- None: Use default system message
|
|
||||||
- Callable: A function that takes (state, runtime) and returns messages
|
|
||||||
reduce_prompt: Prompt for combining results. Can be:
|
|
||||||
- str: A system message string
|
|
||||||
- None: Use default system message
|
|
||||||
- Callable: A function that takes (state, runtime) and returns messages
|
|
||||||
reduce: Controls the reduce behavior. Can be:
|
|
||||||
- "default_reducer": Use the default LLM-based reduce step
|
|
||||||
- None: Skip the reduce step entirely
|
|
||||||
- Callable: Custom reduce function (sync or async)
|
|
||||||
context_schema: Optional context schema for the LangGraph runtime.
|
|
||||||
response_format: Optional pydantic BaseModel for structured output.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A LangGraph that can be invoked with documents to get map-reduce
|
|
||||||
extraction results.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
This implementation is well-suited for large document collections as it
|
|
||||||
processes documents in parallel during the map phase. The Send API enables
|
|
||||||
efficient parallelization while maintaining clean state management.
|
|
||||||
"""
|
|
||||||
extractor = _MapReduceExtractor(
|
|
||||||
model,
|
|
||||||
map_prompt=map_prompt,
|
|
||||||
reduce_prompt=reduce_prompt,
|
|
||||||
reduce=reduce,
|
|
||||||
context_schema=context_schema,
|
|
||||||
response_format=response_format,
|
|
||||||
)
|
|
||||||
return extractor.build()
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["create_map_reduce_chain"]
|
|
@@ -1,459 +0,0 @@
|
|||||||
"""Stuff documents chain for processing documents by putting them all in context."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from typing import (
|
|
||||||
TYPE_CHECKING,
|
|
||||||
Any,
|
|
||||||
Generic,
|
|
||||||
Union,
|
|
||||||
cast,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Used not only for type checking, but is fetched at runtime by Pydantic.
|
|
||||||
from langchain_core.documents import Document as Document # noqa: TC002
|
|
||||||
from langgraph.graph import START, StateGraph
|
|
||||||
from typing_extensions import NotRequired, TypedDict
|
|
||||||
|
|
||||||
from langchain._internal._documents import format_document_xml
|
|
||||||
from langchain._internal._prompts import aresolve_prompt, resolve_prompt
|
|
||||||
from langchain._internal._typing import ContextT
|
|
||||||
from langchain._internal._utils import RunnableCallable
|
|
||||||
from langchain.chat_models import init_chat_model
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from collections.abc import Callable
|
|
||||||
|
|
||||||
from langchain_core.language_models.chat_models import BaseChatModel
|
|
||||||
|
|
||||||
# Used for type checking, but IDEs may not recognize it inside the cast.
|
|
||||||
from langchain_core.messages import AIMessage as AIMessage
|
|
||||||
from langchain_core.messages import MessageLikeRepresentation
|
|
||||||
from langchain_core.runnables import RunnableConfig
|
|
||||||
from langgraph.runtime import Runtime
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
|
|
||||||
# Default system prompts
|
|
||||||
DEFAULT_INIT_PROMPT = (
|
|
||||||
"You are a helpful assistant that summarizes text. "
|
|
||||||
"Please provide a concise summary of the documents "
|
|
||||||
"provided by the user."
|
|
||||||
)
|
|
||||||
|
|
||||||
DEFAULT_STRUCTURED_INIT_PROMPT = (
|
|
||||||
"You are a helpful assistant that extracts structured information from documents. "
|
|
||||||
"Use the provided content and optional question to generate your output, formatted "
|
|
||||||
"according to the predefined schema."
|
|
||||||
)
|
|
||||||
|
|
||||||
DEFAULT_REFINE_PROMPT = (
|
|
||||||
"You are a helpful assistant that refines summaries. "
|
|
||||||
"Given an existing summary and new context, produce a refined summary "
|
|
||||||
"that incorporates the new information while maintaining conciseness."
|
|
||||||
)
|
|
||||||
|
|
||||||
DEFAULT_STRUCTURED_REFINE_PROMPT = (
|
|
||||||
"You are a helpful assistant refining structured information extracted "
|
|
||||||
"from documents. "
|
|
||||||
"You are given a previous result and new document context. "
|
|
||||||
"Update the output to reflect the new context, staying consistent with "
|
|
||||||
"the expected schema."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _format_documents_content(documents: list[Document]) -> str:
|
|
||||||
"""Format documents into content string.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
documents: List of documents to format.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Formatted document content string.
|
|
||||||
"""
|
|
||||||
return "\n\n".join(format_document_xml(doc) for doc in documents)
|
|
||||||
|
|
||||||
|
|
||||||
class ExtractionState(TypedDict):
|
|
||||||
"""State for extraction chain.
|
|
||||||
|
|
||||||
This state tracks the extraction process where documents
|
|
||||||
are processed in batch, with the result being refined if needed.
|
|
||||||
"""
|
|
||||||
|
|
||||||
documents: list[Document]
|
|
||||||
"""List of documents to process."""
|
|
||||||
result: NotRequired[Any]
|
|
||||||
"""Current result, refined with each document."""
|
|
||||||
|
|
||||||
|
|
||||||
class InputSchema(TypedDict):
|
|
||||||
"""Input schema for the extraction chain.
|
|
||||||
|
|
||||||
Defines the expected input format when invoking the extraction chain.
|
|
||||||
"""
|
|
||||||
|
|
||||||
documents: list[Document]
|
|
||||||
"""List of documents to process."""
|
|
||||||
result: NotRequired[Any]
|
|
||||||
"""Existing result to refine (optional)."""
|
|
||||||
|
|
||||||
|
|
||||||
class OutputSchema(TypedDict):
|
|
||||||
"""Output schema for the extraction chain.
|
|
||||||
|
|
||||||
Defines the format of the final result returned by the chain.
|
|
||||||
"""
|
|
||||||
|
|
||||||
result: Any
|
|
||||||
"""Result from processing the documents."""
|
|
||||||
|
|
||||||
|
|
||||||
class ExtractionNodeUpdate(TypedDict):
|
|
||||||
"""Update returned by processing nodes."""
|
|
||||||
|
|
||||||
result: NotRequired[Any]
|
|
||||||
"""Updated result after processing a document."""
|
|
||||||
|
|
||||||
|
|
||||||
class _Extractor(Generic[ContextT]):
|
|
||||||
"""Stuff documents chain implementation.
|
|
||||||
|
|
||||||
This chain works by putting all the documents in the batch into the context
|
|
||||||
window of the language model. It processes all documents together in a single
|
|
||||||
request for extracting information or summaries. Can refine existing results
|
|
||||||
when provided.
|
|
||||||
|
|
||||||
Important: This chain does not attempt to control for the size of the context
|
|
||||||
window of the LLM. Ensure your documents fit within the model's context limits.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
model: Union[BaseChatModel, str],
|
|
||||||
*,
|
|
||||||
prompt: Union[
|
|
||||||
str,
|
|
||||||
None,
|
|
||||||
Callable[
|
|
||||||
[ExtractionState, Runtime[ContextT]],
|
|
||||||
list[MessageLikeRepresentation],
|
|
||||||
],
|
|
||||||
] = None,
|
|
||||||
refine_prompt: Union[
|
|
||||||
str,
|
|
||||||
None,
|
|
||||||
Callable[
|
|
||||||
[ExtractionState, Runtime[ContextT]],
|
|
||||||
list[MessageLikeRepresentation],
|
|
||||||
],
|
|
||||||
] = None,
|
|
||||||
context_schema: type[ContextT] | None = None,
|
|
||||||
response_format: type[BaseModel] | None = None,
|
|
||||||
) -> None:
|
|
||||||
"""Initialize the Extractor.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
model: The language model either a chat model instance
|
|
||||||
(e.g., `ChatAnthropic()`) or string identifier
|
|
||||||
(e.g., `"anthropic:claude-sonnet-4-20250514"`)
|
|
||||||
prompt: Prompt for initial processing. Can be:
|
|
||||||
- str: A system message string
|
|
||||||
- None: Use default system message
|
|
||||||
- Callable: A function that takes (state, runtime) and returns messages
|
|
||||||
refine_prompt: Prompt for refinement steps. Can be:
|
|
||||||
- str: A system message string
|
|
||||||
- None: Use default system message
|
|
||||||
- Callable: A function that takes (state, runtime) and returns messages
|
|
||||||
context_schema: Optional context schema for the LangGraph runtime.
|
|
||||||
response_format: Optional pydantic BaseModel for structured output.
|
|
||||||
"""
|
|
||||||
self.response_format = response_format
|
|
||||||
|
|
||||||
if isinstance(model, str):
|
|
||||||
model = init_chat_model(model)
|
|
||||||
|
|
||||||
self.model = model.with_structured_output(response_format) if response_format else model
|
|
||||||
self.initial_prompt = prompt
|
|
||||||
self.refine_prompt = refine_prompt
|
|
||||||
self.context_schema = context_schema
|
|
||||||
|
|
||||||
def _get_initial_prompt(
|
|
||||||
self, state: ExtractionState, runtime: Runtime[ContextT]
|
|
||||||
) -> list[MessageLikeRepresentation]:
|
|
||||||
"""Generate the initial extraction prompt."""
|
|
||||||
user_content = _format_documents_content(state["documents"])
|
|
||||||
|
|
||||||
# Choose default prompt based on structured output format
|
|
||||||
default_prompt = (
|
|
||||||
DEFAULT_STRUCTURED_INIT_PROMPT if self.response_format else DEFAULT_INIT_PROMPT
|
|
||||||
)
|
|
||||||
|
|
||||||
return resolve_prompt(
|
|
||||||
self.initial_prompt,
|
|
||||||
state,
|
|
||||||
runtime,
|
|
||||||
user_content,
|
|
||||||
default_prompt,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _aget_initial_prompt(
|
|
||||||
self, state: ExtractionState, runtime: Runtime[ContextT]
|
|
||||||
) -> list[MessageLikeRepresentation]:
|
|
||||||
"""Generate the initial extraction prompt (async version)."""
|
|
||||||
user_content = _format_documents_content(state["documents"])
|
|
||||||
|
|
||||||
# Choose default prompt based on structured output format
|
|
||||||
default_prompt = (
|
|
||||||
DEFAULT_STRUCTURED_INIT_PROMPT if self.response_format else DEFAULT_INIT_PROMPT
|
|
||||||
)
|
|
||||||
|
|
||||||
return await aresolve_prompt(
|
|
||||||
self.initial_prompt,
|
|
||||||
state,
|
|
||||||
runtime,
|
|
||||||
user_content,
|
|
||||||
default_prompt,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _get_refine_prompt(
|
|
||||||
self, state: ExtractionState, runtime: Runtime[ContextT]
|
|
||||||
) -> list[MessageLikeRepresentation]:
|
|
||||||
"""Generate the refinement prompt."""
|
|
||||||
# Result should be guaranteed to exist at refinement stage
|
|
||||||
if "result" not in state or state["result"] == "":
|
|
||||||
msg = (
|
|
||||||
"Internal programming error: Result must exist when refining. "
|
|
||||||
"This indicates that the refinement node was reached without "
|
|
||||||
"first processing the initial result node, which violates "
|
|
||||||
"the expected graph execution order."
|
|
||||||
)
|
|
||||||
raise AssertionError(msg)
|
|
||||||
|
|
||||||
new_context = _format_documents_content(state["documents"])
|
|
||||||
|
|
||||||
user_content = (
|
|
||||||
f"Previous result:\n{state['result']}\n\n"
|
|
||||||
f"New context:\n{new_context}\n\n"
|
|
||||||
f"Please provide a refined result."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Choose default prompt based on structured output format
|
|
||||||
default_prompt = (
|
|
||||||
DEFAULT_STRUCTURED_REFINE_PROMPT if self.response_format else DEFAULT_REFINE_PROMPT
|
|
||||||
)
|
|
||||||
|
|
||||||
return resolve_prompt(
|
|
||||||
self.refine_prompt,
|
|
||||||
state,
|
|
||||||
runtime,
|
|
||||||
user_content,
|
|
||||||
default_prompt,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _aget_refine_prompt(
|
|
||||||
self, state: ExtractionState, runtime: Runtime[ContextT]
|
|
||||||
) -> list[MessageLikeRepresentation]:
|
|
||||||
"""Generate the refinement prompt (async version)."""
|
|
||||||
# Result should be guaranteed to exist at refinement stage
|
|
||||||
if "result" not in state or state["result"] == "":
|
|
||||||
msg = (
|
|
||||||
"Internal programming error: Result must exist when refining. "
|
|
||||||
"This indicates that the refinement node was reached without "
|
|
||||||
"first processing the initial result node, which violates "
|
|
||||||
"the expected graph execution order."
|
|
||||||
)
|
|
||||||
raise AssertionError(msg)
|
|
||||||
|
|
||||||
new_context = _format_documents_content(state["documents"])
|
|
||||||
|
|
||||||
user_content = (
|
|
||||||
f"Previous result:\n{state['result']}\n\n"
|
|
||||||
f"New context:\n{new_context}\n\n"
|
|
||||||
f"Please provide a refined result."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Choose default prompt based on structured output format
|
|
||||||
default_prompt = (
|
|
||||||
DEFAULT_STRUCTURED_REFINE_PROMPT if self.response_format else DEFAULT_REFINE_PROMPT
|
|
||||||
)
|
|
||||||
|
|
||||||
return await aresolve_prompt(
|
|
||||||
self.refine_prompt,
|
|
||||||
state,
|
|
||||||
runtime,
|
|
||||||
user_content,
|
|
||||||
default_prompt,
|
|
||||||
)
|
|
||||||
|
|
||||||
def create_document_processor_node(self) -> RunnableCallable:
|
|
||||||
"""Create the main document processing node.
|
|
||||||
|
|
||||||
The node handles both initial processing and refinement of results.
|
|
||||||
|
|
||||||
Refinement is done by providing the existing result and new context.
|
|
||||||
|
|
||||||
If the workflow is run with a checkpointer enabled, the result will be
|
|
||||||
persisted and available for a given thread id.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def _process_node(
|
|
||||||
state: ExtractionState, runtime: Runtime[ContextT], config: RunnableConfig
|
|
||||||
) -> ExtractionNodeUpdate:
|
|
||||||
# Handle empty document list
|
|
||||||
if not state["documents"]:
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Determine if this is initial processing or refinement
|
|
||||||
if "result" not in state or state["result"] == "":
|
|
||||||
# Initial processing
|
|
||||||
prompt = self._get_initial_prompt(state, runtime)
|
|
||||||
response = cast("AIMessage", self.model.invoke(prompt, config=config))
|
|
||||||
result = response if self.response_format else response.text()
|
|
||||||
return {"result": result}
|
|
||||||
# Refinement
|
|
||||||
prompt = self._get_refine_prompt(state, runtime)
|
|
||||||
response = cast("AIMessage", self.model.invoke(prompt, config=config))
|
|
||||||
result = response if self.response_format else response.text()
|
|
||||||
return {"result": result}
|
|
||||||
|
|
||||||
async def _aprocess_node(
|
|
||||||
state: ExtractionState,
|
|
||||||
runtime: Runtime[ContextT],
|
|
||||||
config: RunnableConfig,
|
|
||||||
) -> ExtractionNodeUpdate:
|
|
||||||
# Handle empty document list
|
|
||||||
if not state["documents"]:
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Determine if this is initial processing or refinement
|
|
||||||
if "result" not in state or state["result"] == "":
|
|
||||||
# Initial processing
|
|
||||||
prompt = await self._aget_initial_prompt(state, runtime)
|
|
||||||
response = cast("AIMessage", await self.model.ainvoke(prompt, config=config))
|
|
||||||
result = response if self.response_format else response.text()
|
|
||||||
return {"result": result}
|
|
||||||
# Refinement
|
|
||||||
prompt = await self._aget_refine_prompt(state, runtime)
|
|
||||||
response = cast("AIMessage", await self.model.ainvoke(prompt, config=config))
|
|
||||||
result = response if self.response_format else response.text()
|
|
||||||
return {"result": result}
|
|
||||||
|
|
||||||
return RunnableCallable(
|
|
||||||
_process_node,
|
|
||||||
_aprocess_node,
|
|
||||||
trace=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
def build(
|
|
||||||
self,
|
|
||||||
) -> StateGraph[ExtractionState, ContextT, InputSchema, OutputSchema]:
|
|
||||||
"""Build and compile the LangGraph for batch document extraction."""
|
|
||||||
builder = StateGraph(
|
|
||||||
ExtractionState,
|
|
||||||
context_schema=self.context_schema,
|
|
||||||
input_schema=InputSchema,
|
|
||||||
output_schema=OutputSchema,
|
|
||||||
)
|
|
||||||
builder.add_edge(START, "process")
|
|
||||||
builder.add_node("process", self.create_document_processor_node())
|
|
||||||
return builder
|
|
||||||
|
|
||||||
|
|
||||||
def create_stuff_documents_chain(
|
|
||||||
model: Union[BaseChatModel, str],
|
|
||||||
*,
|
|
||||||
prompt: Union[
|
|
||||||
str,
|
|
||||||
None,
|
|
||||||
Callable[[ExtractionState, Runtime[ContextT]], list[MessageLikeRepresentation]],
|
|
||||||
] = None,
|
|
||||||
refine_prompt: Union[
|
|
||||||
str,
|
|
||||||
None,
|
|
||||||
Callable[[ExtractionState, Runtime[ContextT]], list[MessageLikeRepresentation]],
|
|
||||||
] = None,
|
|
||||||
context_schema: type[ContextT] | None = None,
|
|
||||||
response_format: type[BaseModel] | None = None,
|
|
||||||
) -> StateGraph[ExtractionState, ContextT, InputSchema, OutputSchema]:
|
|
||||||
"""Create a stuff documents chain for processing documents.
|
|
||||||
|
|
||||||
This chain works by putting all the documents in the batch into the context
|
|
||||||
window of the language model. It processes all documents together in a single
|
|
||||||
request for extracting information or summaries. Can refine existing results
|
|
||||||
when provided. The default prompts are optimized for summarization tasks, but
|
|
||||||
can be customized for other extraction tasks via the prompt parameters or
|
|
||||||
response_format.
|
|
||||||
|
|
||||||
Strategy:
|
|
||||||
1. Put all documents into the context window
|
|
||||||
2. Process all documents together in a single request
|
|
||||||
3. If an existing result is provided, refine it with all documents at once
|
|
||||||
4. Return the result
|
|
||||||
|
|
||||||
Important:
|
|
||||||
This chain does not attempt to control for the size of the context
|
|
||||||
window of the LLM. Ensure your documents fit within the model's context limits.
|
|
||||||
|
|
||||||
Example:
|
|
||||||
```python
|
|
||||||
from langchain.chat_models import init_chat_model
|
|
||||||
from langchain_core.documents import Document
|
|
||||||
|
|
||||||
model = init_chat_model("anthropic:claude-sonnet-4-20250514")
|
|
||||||
builder = create_stuff_documents_chain(model)
|
|
||||||
chain = builder.compile()
|
|
||||||
docs = [
|
|
||||||
Document(page_content="First document content..."),
|
|
||||||
Document(page_content="Second document content..."),
|
|
||||||
Document(page_content="Third document content..."),
|
|
||||||
]
|
|
||||||
result = chain.invoke({"documents": docs})
|
|
||||||
print(result["result"])
|
|
||||||
|
|
||||||
# Structured summary/extraction by passing a schema
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
class Summary(BaseModel):
|
|
||||||
title: str
|
|
||||||
key_points: list[str]
|
|
||||||
|
|
||||||
builder = create_stuff_documents_chain(model, response_format=Summary)
|
|
||||||
chain = builder.compile()
|
|
||||||
result = chain.invoke({"documents": docs})
|
|
||||||
print(result["result"].title) # Access structured fields
|
|
||||||
```
|
|
||||||
|
|
||||||
Args:
|
|
||||||
model: The language model for document processing.
|
|
||||||
prompt: Prompt for initial processing. Can be:
|
|
||||||
- str: A system message string
|
|
||||||
- None: Use default system message
|
|
||||||
- Callable: A function that takes (state, runtime) and returns messages
|
|
||||||
refine_prompt: Prompt for refinement steps. Can be:
|
|
||||||
- str: A system message string
|
|
||||||
- None: Use default system message
|
|
||||||
- Callable: A function that takes (state, runtime) and returns messages
|
|
||||||
context_schema: Optional context schema for the LangGraph runtime.
|
|
||||||
response_format: Optional pydantic BaseModel for structured output.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A LangGraph that can be invoked with documents to extract information.
|
|
||||||
|
|
||||||
.. note::
|
|
||||||
This is a "stuff" documents chain that puts all documents into the context
|
|
||||||
window and processes them together. It supports refining existing results.
|
|
||||||
Default prompts are optimized for summarization but can be customized for
|
|
||||||
other tasks. Important: Does not control for context window size.
|
|
||||||
"""
|
|
||||||
extractor = _Extractor(
|
|
||||||
model,
|
|
||||||
prompt=prompt,
|
|
||||||
refine_prompt=refine_prompt,
|
|
||||||
context_schema=context_schema,
|
|
||||||
response_format=response_format,
|
|
||||||
)
|
|
||||||
return extractor.build()
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["create_stuff_documents_chain"]
|
|
@@ -1,15 +0,0 @@
|
|||||||
"""Global values and configuration that apply to all of LangChain."""
|
|
||||||
|
|
||||||
from typing import TYPE_CHECKING, Optional
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from langchain_core.caches import BaseCache
|
|
||||||
|
|
||||||
|
|
||||||
# DO NOT USE THESE VALUES DIRECTLY!
|
|
||||||
# Use them only via `get_<X>()` and `set_<X>()` below,
|
|
||||||
# or else your code may behave unexpectedly with other uses of these global settings:
|
|
||||||
# https://github.com/langchain-ai/langchain/pull/11311#issuecomment-1743780004
|
|
||||||
_verbose: bool = False
|
|
||||||
_debug: bool = False
|
|
||||||
_llm_cache: Optional["BaseCache"] = None
|
|
Reference in New Issue
Block a user