In an effort to make it as easy as possible to create custom chains, we've implemented a ["Runnable"](https://api.python.langchain.com/en/latest/schema/langchain.schema.runnable.Runnable.html#langchain.schema.runnable.Runnable) protocol that most components implement. This is a standard interface with a few different methods, which makes it easy to define custom chains as well as making it possible to invoke them in a standard way. The standard interface exposed includes:

- [`stream`](#stream): stream back chunks of the response
- [`invoke`](#invoke): call the chain on an input
- [`batch`](#batch): call the chain on a list of inputs

These also have corresponding async methods:

- [`astream`](#async-stream): stream back chunks of the response async
- [`ainvoke`](#async-invoke): call the chain on an input async
- [`abatch`](#async-batch): call the chain on a list of inputs async
- [`astream_log`](#async-stream-intermediate-steps): stream back intermediate steps as they happen, in addition to the final response

The type of the input varies by component:

| Component | Input Type |
| --- | --- |
|Prompt|Dictionary|
|Retriever|Single string|
|LLM, ChatModel| Single string, list of chat messages or a PromptValue|
|Tool|Single string, or dictionary, depending on the tool|
|OutputParser|The output of an LLM or ChatModel|

The output type also varies by component:

| Component | Output Type |
| --- | --- |
| LLM | String |
| ChatModel | ChatMessage |
| Prompt | PromptValue |
| Retriever | List of documents |
| Tool | Depends on the tool |
| OutputParser | Depends on the parser |

All runnables expose properties to inspect the input and output types:
- [`input_schema`](#input-schema): an input Pydantic model auto-generated from the structure of the Runnable
- [`output_schema`](#output-schema): an output Pydantic model auto-generated from the structure of the Runnable

Let's take a look at these methods! To do so, we'll create a super simple PromptTemplate + ChatModel chain.

In [1]:
from langchain.prompts import ChatPromptTemplate
from langchain.chat_models import ChatOpenAI


In [2]:
model = ChatOpenAI()


In [3]:
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")


In [4]:
chain = prompt | model


## Input Schema

A description of the inputs accepted by a Runnable.
This is a Pydantic model dynamically generated from the structure of any Runnable.
You can call `.schema()` on it to obtain a JSONSchema representation.

In [5]:
# The input schema of the chain is the input schema of its first part, the prompt.
chain.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

## Output Schema

A description of the outputs produced by a Runnable.
This is a Pydantic model dynamically generated from the structure of any Runnable.
You can call `.schema()` on it to obtain a JSONSchema representation.

In [6]:
# The output schema of the chain is the output schema of its last part, in this case a ChatModel, which outputs a ChatMessage
chain.output_schema.schema()

{'title': 'ChatOpenAIOutput',
 'anyOf': [{'$ref': '#/definitions/HumanMessageChunk'},
  {'$ref': '#/definitions/AIMessageChunk'},
  {'$ref': '#/definitions/ChatMessageChunk'},
  {'$ref': '#/definitions/FunctionMessageChunk'},
  {'$ref': '#/definitions/SystemMessageChunk'}],
 'definitions': {'HumanMessageChunk': {'title': 'HumanMessageChunk',
   'description': 'A Human Message chunk.',
   'type': 'object',
   'properties': {'content': {'title': 'Content', 'type': 'string'},
    'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'},
    'type': {'title': 'Type',
     'default': 'human',
     'enum': ['human'],
     'type': 'string'},
    'example': {'title': 'Example', 'default': False, 'type': 'boolean'},
    'is_chunk': {'title': 'Is Chunk',
     'default': True,
     'enum': [True],
     'type': 'boolean'}},
   'required': ['content']},
  'AIMessageChunk': {'title': 'AIMessageChunk',
   'description': 'A Message chunk from an AI.',
   'type': 'object',
   'properties':

## Stream

In [7]:
for s in chain.stream({"topic": "bears"}):
    print(s.content, end="", flush=True)


Why don't bears wear shoes? 

Because they have bear feet!

## Invoke

In [8]:
chain.invoke({"topic": "bears"})


AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!")

## Batch

In [9]:
chain.batch([{"topic": "bears"}, {"topic": "cats"}])


[AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!"),
 AIMessage(content="Why don't cats play poker in the wild?\n\nToo many cheetahs!")]

You can set the number of concurrent requests by using the `max_concurrency` parameter

In [10]:
chain.batch([{"topic": "bears"}, {"topic": "cats"}], config={"max_concurrency": 5})


[AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!"),
 AIMessage(content="Sure, here's a cat joke for you:\n\nWhy don't cats play poker in the wild?\n\nToo many cheetahs!")]

## Async Stream

In [11]:
async for s in chain.astream({"topic": "bears"}):
    print(s.content, end="", flush=True)


Sure, here's a bear joke for you:

Why don't bears wear shoes?

Because they have bear feet!

## Async Invoke

In [12]:
await chain.ainvoke({"topic": "bears"})


AIMessage(content="Why don't bears wear shoes? \n\nBecause they have bear feet!")

## Async Batch

In [13]:
await chain.abatch([{"topic": "bears"}])


[AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!")]

## Async Stream Intermediate Steps

All runnables also have a method `.astream_log()` which can be used to stream (as they happen) all or part of the intermediate steps of your chain/sequence. 

This is useful eg. to show progress to the user, to use intermediate results, or even just to debug your chain.

You can choose to stream all steps (default), or include/exclude steps by name, tags or metadata.

This method yields [JSONPatch](https://jsonpatch.com) ops that when applied in the same order as received build up the RunState.

```python
class LogEntry(TypedDict):
    id: str
    """ID of the sub-run."""
    name: str
    """Name of the object being run."""
    type: str
    """Type of the object being run, eg. prompt, chain, llm, etc."""
    tags: List[str]
    """List of tags for the run."""
    metadata: Dict[str, Any]
    """Key-value pairs of metadata for the run."""
    start_time: str
    """ISO-8601 timestamp of when the run started."""

    streamed_output_str: List[str]
    """List of LLM tokens streamed by this run, if applicable."""
    final_output: Optional[Any]
    """Final output of this run.
    Only available after the run has finished successfully."""
    end_time: Optional[str]
    """ISO-8601 timestamp of when the run ended.
    Only available after the run has finished."""


class RunState(TypedDict):
    id: str
    """ID of the run."""
    streamed_output: List[Any]
    """List of output chunks streamed by Runnable.stream()"""
    final_output: Optional[Any]
    """Final output of the run, usually the result of aggregating (`+`) streamed_output.
    Only available after the run has finished successfully."""

    logs: Dict[str, LogEntry]
    """Map of run names to sub-runs. If filters were supplied, this list will
    contain only the runs that matched the filters."""
```

### Streaming JSONPatch chunks

This is useful eg. to stream the JSONPatch in an HTTP server, and then apply the ops on the client to rebuild the run state there. See [LangServe](https://github.com/langchain-ai/langserve) for tooling to make it easier to build a webserver from any Runnable.

In [14]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
from langchain.vectorstores import FAISS

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(["harrison worked at kensho"], embedding=OpenAIEmbeddings())
retriever = vectorstore.as_retriever()

retrieval_chain = (
    {"context": retriever.with_config(run_name='Docs'), "question": RunnablePassthrough()}
    | prompt 
    | model 
    | StrOutputParser()
)

async for chunk in retrieval_chain.astream_log("where did harrison work?", include_names=['Docs']):
    print(chunk)


RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': 'fd6fcf62-c92c-4edf-8713-0fc5df000f62',
            'logs': {},
            'streamed_output': []}})
RunLogPatch({'op': 'add',
  'path': '/logs/Docs',
  'value': {'end_time': None,
            'final_output': None,
            'id': '8c998257-1ec8-4546-b744-c3fdb9728c41',
            'metadata': {},
            'name': 'Docs',
            'start_time': '2023-10-05T12:52:35.668',
            'streamed_output_str': [],
            'tags': ['map:key:context', 'FAISS'],
            'type': 'retriever'}})
RunLogPatch({'op': 'add',
  'path': '/logs/Docs/final_output',
  'value': {'documents': [Document(page_content='harrison worked at kensho')]}},
 {'op': 'add',
  'path': '/logs/Docs/end_time',
  'value': '2023-10-05T12:52:36.033'})
RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': ''})
RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': 'H'})
RunLogPatch({'op': 'add', 

### Streaming the incremental RunState

You can simply pass diff=False to get incremental values of RunState.

In [15]:
async for chunk in retrieval_chain.astream_log("where did harrison work?", include_names=['Docs'], diff=False):
    print(chunk)

RunLog({'final_output': None,
 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',
 'logs': {},
 'streamed_output': []})
RunLog({'final_output': None,
 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',
 'logs': {'Docs': {'end_time': None,
                   'final_output': None,
                   'id': '621597dd-d716-4532-938d-debc21a453d1',
                   'metadata': {},
                   'name': 'Docs',
                   'start_time': '2023-10-05T12:52:36.935',
                   'streamed_output_str': [],
                   'tags': ['map:key:context', 'FAISS'],
                   'type': 'retriever'}},
 'streamed_output': []})
RunLog({'final_output': None,
 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',
 'logs': {'Docs': {'end_time': '2023-10-05T12:52:37.217',
                   'final_output': {'documents': [Document(page_content='harrison worked at kensho')]},
                   'id': '621597dd-d716-4532-938d-debc21a453d1',
                   'metadata': {},
                   'name':

## Parallelism

Let's take a look at how LangChain Expression Language support parallel requests as much as possible. For example, when using a RunnableMap (often written as a dictionary) it executes each element in parallel.

In [7]:
from langchain.schema.runnable import RunnableMap
chain1 = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
chain2 = ChatPromptTemplate.from_template("write a short (2 line) poem about {topic}") | model
combined = RunnableMap({
    "joke": chain1,
    "poem": chain2,
})


In [11]:
%%time
chain1.invoke({"topic": "bears"})


CPU times: user 31.7 ms, sys: 8.59 ms, total: 40.3 ms
Wall time: 1.05 s


AIMessage(content="Why don't bears like fast food?\n\nBecause they can't catch it!", additional_kwargs={}, example=False)

In [12]:
%%time
chain2.invoke({"topic": "bears"})


CPU times: user 42.9 ms, sys: 10.2 ms, total: 53 ms
Wall time: 1.93 s


AIMessage(content="In forest's embrace, bears roam free,\nSilent strength, nature's majesty.", additional_kwargs={}, example=False)

In [13]:
%%time
combined.invoke({"topic": "bears"})


CPU times: user 96.3 ms, sys: 20.4 ms, total: 117 ms
Wall time: 1.1 s


{'joke': AIMessage(content="Why don't bears wear socks?\n\nBecause they have bear feet!", additional_kwargs={}, example=False),
 'poem': AIMessage(content="In forest's embrace,\nMajestic bears leave their trace.", additional_kwargs={}, example=False)}