{ "cells": [ { "cell_type": "raw", "id": "ce0e08fd", "metadata": {}, "source": [ "---\n", "sidebar_position: 3\n", "title: \"Lambda: Run custom functions\"\n", "keywords: [RunnableLambda, LCEL]\n", "---" ] }, { "cell_type": "markdown", "id": "fbc4bf6e", "metadata": {}, "source": [ "# Run custom functions\n", "\n", "You can use arbitrary functions in the pipeline.\n", "\n", "Note that all inputs to these functions need to be a SINGLE argument. If you have a function that accepts multiple arguments, you should write a wrapper that accepts a single input and unpacks it into multiple argument." ] }, { "cell_type": "raw", "id": "9a5fe916", "metadata": {}, "source": [ "%pip install --upgrade --quiet langchain langchain-openai" ] }, { "cell_type": "code", "execution_count": 1, "id": "6bb221b3", "metadata": {}, "outputs": [], "source": [ "from operator import itemgetter\n", "\n", "from langchain_core.prompts import ChatPromptTemplate\n", "from langchain_core.runnables import RunnableLambda\n", "from langchain_openai import ChatOpenAI\n", "\n", "\n", "def length_function(text):\n", " return len(text)\n", "\n", "\n", "def _multiple_length_function(text1, text2):\n", " return len(text1) * len(text2)\n", "\n", "\n", "def multiple_length_function(_dict):\n", " return _multiple_length_function(_dict[\"text1\"], _dict[\"text2\"])\n", "\n", "\n", "prompt = ChatPromptTemplate.from_template(\"what is {a} + {b}\")\n", "model = ChatOpenAI()\n", "\n", "chain1 = prompt | model\n", "\n", "chain = (\n", " {\n", " \"a\": itemgetter(\"foo\") | RunnableLambda(length_function),\n", " \"b\": {\"text1\": itemgetter(\"foo\"), \"text2\": itemgetter(\"bar\")}\n", " | RunnableLambda(multiple_length_function),\n", " }\n", " | prompt\n", " | model\n", ")" ] }, { "cell_type": "code", "execution_count": 2, "id": "5488ec85", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "AIMessage(content='3 + 9 = 12', response_metadata={'token_usage': {'completion_tokens': 7, 'prompt_tokens': 14, 'total_tokens': 21}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_b28b39ffa8', 'finish_reason': 'stop', 'logprobs': None}, id='run-bd204541-81fd-429a-ad92-dd1913af9b1c-0')" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "chain.invoke({\"foo\": \"bar\", \"bar\": \"gah\"})" ] }, { "cell_type": "markdown", "id": "4728ddd9-914d-42ce-ae9b-72c9ce8ec940", "metadata": {}, "source": [ "## Accepting a Runnable Config\n", "\n", "Runnable lambdas can optionally accept a [RunnableConfig](https://api.python.langchain.com/en/latest/runnables/langchain_core.runnables.config.RunnableConfig.html#langchain_core.runnables.config.RunnableConfig), which they can use to pass callbacks, tags, and other configuration information to nested runs." ] }, { "cell_type": "code", "execution_count": 3, "id": "80b3b5f6-5d58-44b9-807e-cce9a46bf49f", "metadata": {}, "outputs": [], "source": [ "from langchain_core.output_parsers import StrOutputParser\n", "from langchain_core.runnables import RunnableConfig" ] }, { "cell_type": "code", "execution_count": 4, "id": "ff0daf0c-49dd-4d21-9772-e5fa133c5f36", "metadata": {}, "outputs": [], "source": [ "import json\n", "\n", "\n", "def parse_or_fix(text: str, config: RunnableConfig):\n", " fixing_chain = (\n", " ChatPromptTemplate.from_template(\n", " \"Fix the following text:\\n\\n```text\\n{input}\\n```\\nError: {error}\"\n", " \" Don't narrate, just respond with the fixed data.\"\n", " )\n", " | ChatOpenAI()\n", " | StrOutputParser()\n", " )\n", " for _ in range(3):\n", " try:\n", " return json.loads(text)\n", " except Exception as e:\n", " text = fixing_chain.invoke({\"input\": text, \"error\": e}, config)\n", " return \"Failed to parse\"" ] }, { "cell_type": "code", "execution_count": 5, "id": "1a5e709e-9d75-48c7-bb9c-503251990505", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'foo': 'bar'}\n", "Tokens Used: 62\n", "\tPrompt Tokens: 56\n", "\tCompletion Tokens: 6\n", "Successful Requests: 1\n", "Total Cost (USD): $9.6e-05\n" ] } ], "source": [ "from langchain_community.callbacks import get_openai_callback\n", "\n", "with get_openai_callback() as cb:\n", " output = RunnableLambda(parse_or_fix).invoke(\n", " \"{foo: bar}\", {\"tags\": [\"my-tag\"], \"callbacks\": [cb]}\n", " )\n", " print(output)\n", " print(cb)" ] }, { "cell_type": "markdown", "id": "922b48bd", "metadata": {}, "source": [ "# Streaming\n", "\n", "You can use generator functions (ie. functions that use the `yield` keyword, and behave like iterators) in a LCEL pipeline.\n", "\n", "The signature of these generators should be `Iterator[Input] -> Iterator[Output]`. Or for async generators: `AsyncIterator[Input] -> AsyncIterator[Output]`.\n", "\n", "These are useful for:\n", "- implementing a custom output parser\n", "- modifying the output of a previous step, while preserving streaming capabilities\n", "\n", "Here's an example of a custom output parser for comma-separated lists:" ] }, { "cell_type": "code", "execution_count": 6, "id": "29f55c38", "metadata": {}, "outputs": [], "source": [ "from typing import Iterator, List\n", "\n", "prompt = ChatPromptTemplate.from_template(\n", " \"Write a comma-separated list of 5 animals similar to: {animal}. Do not include numbers\"\n", ")\n", "model = ChatOpenAI(temperature=0.0)\n", "\n", "str_chain = prompt | model | StrOutputParser()" ] }, { "cell_type": "code", "execution_count": 7, "id": "75aa946b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "lion, tiger, wolf, gorilla, panda" ] } ], "source": [ "for chunk in str_chain.stream({\"animal\": \"bear\"}):\n", " print(chunk, end=\"\", flush=True)" ] }, { "cell_type": "code", "execution_count": 8, "id": "d002a7fe", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'lion, tiger, wolf, gorilla, panda'" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "str_chain.invoke({\"animal\": \"bear\"})" ] }, { "cell_type": "code", "execution_count": 9, "id": "f08b8a5b", "metadata": {}, "outputs": [], "source": [ "# This is a custom parser that splits an iterator of llm tokens\n", "# into a list of strings separated by commas\n", "def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:\n", " # hold partial input until we get a comma\n", " buffer = \"\"\n", " for chunk in input:\n", " # add current chunk to buffer\n", " buffer += chunk\n", " # while there are commas in the buffer\n", " while \",\" in buffer:\n", " # split buffer on comma\n", " comma_index = buffer.index(\",\")\n", " # yield everything before the comma\n", " yield [buffer[:comma_index].strip()]\n", " # save the rest for the next iteration\n", " buffer = buffer[comma_index + 1 :]\n", " # yield the last chunk\n", " yield [buffer.strip()]" ] }, { "cell_type": "code", "execution_count": 10, "id": "02e414aa", "metadata": {}, "outputs": [], "source": [ "list_chain = str_chain | split_into_list" ] }, { "cell_type": "code", "execution_count": 11, "id": "7ed8799d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['lion']\n", "['tiger']\n", "['wolf']\n", "['gorilla']\n", "['panda']\n" ] } ], "source": [ "for chunk in list_chain.stream({\"animal\": \"bear\"}):\n", " print(chunk, flush=True)" ] }, { "cell_type": "code", "execution_count": 12, "id": "9ea4ddc6", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['lion', 'tiger', 'wolf', 'gorilla', 'elephant']" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list_chain.invoke({\"animal\": \"bear\"})" ] }, { "cell_type": "markdown", "id": "96e320ed", "metadata": {}, "source": [ "## Async version" ] }, { "cell_type": "code", "execution_count": 13, "id": "569dbbef", "metadata": {}, "outputs": [], "source": [ "from typing import AsyncIterator\n", "\n", "\n", "async def asplit_into_list(\n", " input: AsyncIterator[str],\n", ") -> AsyncIterator[List[str]]: # async def\n", " buffer = \"\"\n", " async for (\n", " chunk\n", " ) in input: # `input` is a `async_generator` object, so use `async for`\n", " buffer += chunk\n", " while \",\" in buffer:\n", " comma_index = buffer.index(\",\")\n", " yield [buffer[:comma_index].strip()]\n", " buffer = buffer[comma_index + 1 :]\n", " yield [buffer.strip()]\n", "\n", "\n", "list_chain = str_chain | asplit_into_list" ] }, { "cell_type": "code", "execution_count": 14, "id": "7a76b713", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['lion']\n", "['tiger']\n", "['wolf']\n", "['gorilla']\n", "['panda']\n" ] } ], "source": [ "async for chunk in list_chain.astream({\"animal\": \"bear\"}):\n", " print(chunk, flush=True)" ] }, { "cell_type": "code", "execution_count": 15, "id": "3a650482", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['lion', 'tiger', 'wolf', 'gorilla', 'panda']" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "await list_chain.ainvoke({\"animal\": \"bear\"})" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.5" } }, "nbformat": 4, "nbformat_minor": 5 }