mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-03 12:07:36 +00:00
community: fixed critical bugs at Writer provider (#27879)
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import (
|
||||
Any,
|
||||
@@ -11,7 +12,6 @@ from typing import (
|
||||
Iterator,
|
||||
List,
|
||||
Literal,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
@@ -26,8 +26,6 @@ from langchain_core.callbacks import (
|
||||
from langchain_core.language_models import LanguageModelInput
|
||||
from langchain_core.language_models.chat_models import (
|
||||
BaseChatModel,
|
||||
agenerate_from_stream,
|
||||
generate_from_stream,
|
||||
)
|
||||
from langchain_core.messages import (
|
||||
AIMessage,
|
||||
@@ -40,99 +38,49 @@ from langchain_core.messages import (
|
||||
)
|
||||
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
|
||||
from langchain_core.runnables import Runnable
|
||||
from langchain_core.utils import get_from_dict_or_env
|
||||
from langchain_core.utils.function_calling import convert_to_openai_tool
|
||||
from pydantic import BaseModel, ConfigDict, Field, SecretStr
|
||||
from pydantic import BaseModel, ConfigDict, Field, SecretStr, model_validator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _convert_message_to_dict(message: BaseMessage) -> dict:
|
||||
"""Convert a LangChain message to a Writer message dict."""
|
||||
message_dict = {"role": "", "content": message.content}
|
||||
|
||||
if isinstance(message, ChatMessage):
|
||||
message_dict["role"] = message.role
|
||||
elif isinstance(message, HumanMessage):
|
||||
message_dict["role"] = "user"
|
||||
elif isinstance(message, AIMessage):
|
||||
message_dict["role"] = "assistant"
|
||||
if message.tool_calls:
|
||||
message_dict["tool_calls"] = [
|
||||
{
|
||||
"id": tool["id"],
|
||||
"type": "function",
|
||||
"function": {"name": tool["name"], "arguments": tool["args"]},
|
||||
}
|
||||
for tool in message.tool_calls
|
||||
]
|
||||
elif isinstance(message, SystemMessage):
|
||||
message_dict["role"] = "system"
|
||||
elif isinstance(message, ToolMessage):
|
||||
message_dict["role"] = "tool"
|
||||
message_dict["tool_call_id"] = message.tool_call_id
|
||||
else:
|
||||
raise ValueError(f"Got unknown message type: {type(message)}")
|
||||
|
||||
if message.name:
|
||||
message_dict["name"] = message.name
|
||||
|
||||
return message_dict
|
||||
|
||||
|
||||
def _convert_dict_to_message(response_dict: Dict[str, Any]) -> BaseMessage:
|
||||
"""Convert a Writer message dict to a LangChain message."""
|
||||
role = response_dict["role"]
|
||||
content = response_dict.get("content", "")
|
||||
|
||||
if role == "user":
|
||||
return HumanMessage(content=content)
|
||||
elif role == "assistant":
|
||||
additional_kwargs = {}
|
||||
if tool_calls := response_dict.get("tool_calls"):
|
||||
additional_kwargs["tool_calls"] = tool_calls
|
||||
return AIMessageChunk(content=content, additional_kwargs=additional_kwargs)
|
||||
elif role == "system":
|
||||
return SystemMessage(content=content)
|
||||
elif role == "tool":
|
||||
return ToolMessage(
|
||||
content=content,
|
||||
tool_call_id=response_dict["tool_call_id"],
|
||||
name=response_dict.get("name"),
|
||||
)
|
||||
else:
|
||||
return ChatMessage(content=content, role=role)
|
||||
|
||||
|
||||
class ChatWriter(BaseChatModel):
|
||||
"""Writer chat model.
|
||||
|
||||
To use, you should have the ``writer-sdk`` Python package installed, and the
|
||||
environment variable ``WRITER_API_KEY`` set with your API key.
|
||||
environment variable ``WRITER_API_KEY`` set with your API key or pass 'api_key'
|
||||
init param.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
from langchain_community.chat_models import ChatWriter
|
||||
|
||||
chat = ChatWriter(model="palmyra-x-004")
|
||||
chat = ChatWriter(
|
||||
api_key="your key"
|
||||
model="palmyra-x-004"
|
||||
)
|
||||
"""
|
||||
|
||||
client: Any = Field(default=None, exclude=True) #: :meta private:
|
||||
async_client: Any = Field(default=None, exclude=True) #: :meta private:
|
||||
|
||||
api_key: Optional[SecretStr] = Field(default=None)
|
||||
"""Writer API key."""
|
||||
|
||||
model_name: str = Field(default="palmyra-x-004", alias="model")
|
||||
"""Model name to use."""
|
||||
|
||||
temperature: float = 0.7
|
||||
"""What sampling temperature to use."""
|
||||
|
||||
model_kwargs: Dict[str, Any] = Field(default_factory=dict)
|
||||
"""Holds any model parameters valid for `create` call not explicitly specified."""
|
||||
writer_api_key: Optional[SecretStr] = Field(default=None, alias="api_key")
|
||||
"""Writer API key."""
|
||||
writer_api_base: Optional[str] = Field(default=None, alias="base_url")
|
||||
"""Base URL for API requests."""
|
||||
streaming: bool = False
|
||||
"""Whether to stream the results or not."""
|
||||
|
||||
n: int = 1
|
||||
"""Number of chat completions to generate for each prompt."""
|
||||
|
||||
max_tokens: Optional[int] = None
|
||||
"""Maximum number of tokens to generate."""
|
||||
|
||||
@@ -149,37 +97,159 @@ class ChatWriter(BaseChatModel):
|
||||
return {
|
||||
"model_name": self.model_name,
|
||||
"temperature": self.temperature,
|
||||
"streaming": self.streaming,
|
||||
**self.model_kwargs,
|
||||
}
|
||||
|
||||
def _create_chat_result(self, response: Mapping[str, Any]) -> ChatResult:
|
||||
@property
|
||||
def _default_params(self) -> Dict[str, Any]:
|
||||
"""Get the default parameters for calling Writer API."""
|
||||
return {
|
||||
"model": self.model_name,
|
||||
"temperature": self.temperature,
|
||||
"n": self.n,
|
||||
"max_tokens": self.max_tokens,
|
||||
**self.model_kwargs,
|
||||
}
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_environment(cls, values: Dict) -> Any:
|
||||
"""Validates that api key is passed and creates Writer clients."""
|
||||
try:
|
||||
from writerai import AsyncClient, Client
|
||||
except ImportError as e:
|
||||
raise ImportError(
|
||||
"Could not import writerai python package. "
|
||||
"Please install it with `pip install writerai`."
|
||||
) from e
|
||||
|
||||
if not values.get("client"):
|
||||
values.update(
|
||||
{
|
||||
"client": Client(
|
||||
api_key=get_from_dict_or_env(
|
||||
values, "api_key", "WRITER_API_KEY"
|
||||
)
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
if not values.get("async_client"):
|
||||
values.update(
|
||||
{
|
||||
"async_client": AsyncClient(
|
||||
api_key=get_from_dict_or_env(
|
||||
values, "api_key", "WRITER_API_KEY"
|
||||
)
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
if not (
|
||||
type(values.get("client")) is Client
|
||||
and type(values.get("async_client")) is AsyncClient
|
||||
):
|
||||
raise ValueError(
|
||||
"'client' attribute must be with type 'Client' and "
|
||||
"'async_client' must be with type 'AsyncClient' from 'writerai' package"
|
||||
)
|
||||
|
||||
return values
|
||||
|
||||
def _create_chat_result(self, response: Any) -> ChatResult:
|
||||
generations = []
|
||||
for choice in response["choices"]:
|
||||
message = _convert_dict_to_message(choice["message"])
|
||||
for choice in response.choices:
|
||||
message = self._convert_writer_to_langchain(choice.message)
|
||||
gen = ChatGeneration(
|
||||
message=message,
|
||||
generation_info=dict(finish_reason=choice.get("finish_reason")),
|
||||
generation_info=dict(finish_reason=choice.finish_reason),
|
||||
)
|
||||
generations.append(gen)
|
||||
|
||||
token_usage = response.get("usage", {})
|
||||
token_usage = {}
|
||||
|
||||
if response.usage:
|
||||
token_usage = response.usage.__dict__
|
||||
llm_output = {
|
||||
"token_usage": token_usage,
|
||||
"model_name": self.model_name,
|
||||
"system_fingerprint": response.get("system_fingerprint", ""),
|
||||
"system_fingerprint": response.system_fingerprint,
|
||||
}
|
||||
|
||||
return ChatResult(generations=generations, llm_output=llm_output)
|
||||
|
||||
def _convert_messages_to_dicts(
|
||||
@staticmethod
|
||||
def _convert_langchain_to_writer(message: BaseMessage) -> dict:
|
||||
"""Convert a LangChain message to a Writer message dict."""
|
||||
message_dict = {"role": "", "content": message.content}
|
||||
|
||||
if isinstance(message, ChatMessage):
|
||||
message_dict["role"] = message.role
|
||||
elif isinstance(message, HumanMessage):
|
||||
message_dict["role"] = "user"
|
||||
elif isinstance(message, AIMessage):
|
||||
message_dict["role"] = "assistant"
|
||||
if message.tool_calls:
|
||||
message_dict["tool_calls"] = [
|
||||
{
|
||||
"id": tool["id"],
|
||||
"type": "function",
|
||||
"function": {"name": tool["name"], "arguments": tool["args"]},
|
||||
}
|
||||
for tool in message.tool_calls
|
||||
]
|
||||
elif isinstance(message, SystemMessage):
|
||||
message_dict["role"] = "system"
|
||||
elif isinstance(message, ToolMessage):
|
||||
message_dict["role"] = "tool"
|
||||
message_dict["tool_call_id"] = message.tool_call_id
|
||||
else:
|
||||
raise ValueError(f"Got unknown message type: {type(message)}")
|
||||
|
||||
if message.name:
|
||||
message_dict["name"] = message.name
|
||||
|
||||
return message_dict
|
||||
|
||||
@staticmethod
|
||||
def _convert_writer_to_langchain(response_message: Any) -> BaseMessage:
|
||||
"""Convert a Writer message to a LangChain message."""
|
||||
if not isinstance(response_message, dict):
|
||||
response_message = json.loads(
|
||||
json.dumps(response_message, default=lambda o: o.__dict__)
|
||||
)
|
||||
|
||||
role = response_message.get("role", "")
|
||||
content = response_message.get("content")
|
||||
if not content:
|
||||
content = ""
|
||||
|
||||
if role == "user":
|
||||
return HumanMessage(content=content)
|
||||
elif role == "assistant":
|
||||
additional_kwargs = {}
|
||||
if tool_calls := response_message.get("tool_calls", []):
|
||||
additional_kwargs["tool_calls"] = tool_calls
|
||||
return AIMessageChunk(content=content, additional_kwargs=additional_kwargs)
|
||||
elif role == "system":
|
||||
return SystemMessage(content=content)
|
||||
elif role == "tool":
|
||||
return ToolMessage(
|
||||
content=content,
|
||||
tool_call_id=response_message.get("tool_call_id", ""),
|
||||
name=response_message.get("name", ""),
|
||||
)
|
||||
else:
|
||||
return ChatMessage(content=content, role=role)
|
||||
|
||||
def _convert_messages_to_writer(
|
||||
self, messages: List[BaseMessage], stop: Optional[List[str]] = None
|
||||
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
|
||||
"""Convert a list of LangChain messages to List of Writer dicts."""
|
||||
params = {
|
||||
"model": self.model_name,
|
||||
"temperature": self.temperature,
|
||||
"n": self.n,
|
||||
"stream": self.streaming,
|
||||
**self.model_kwargs,
|
||||
}
|
||||
if stop:
|
||||
@@ -187,7 +257,7 @@ class ChatWriter(BaseChatModel):
|
||||
if self.max_tokens is not None:
|
||||
params["max_tokens"] = self.max_tokens
|
||||
|
||||
message_dicts = [_convert_message_to_dict(m) for m in messages]
|
||||
message_dicts = [self._convert_langchain_to_writer(m) for m in messages]
|
||||
return message_dicts, params
|
||||
|
||||
def _stream(
|
||||
@@ -197,17 +267,17 @@ class ChatWriter(BaseChatModel):
|
||||
run_manager: Optional[CallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> Iterator[ChatGenerationChunk]:
|
||||
message_dicts, params = self._convert_messages_to_dicts(messages, stop)
|
||||
message_dicts, params = self._convert_messages_to_writer(messages, stop)
|
||||
params = {**params, **kwargs, "stream": True}
|
||||
|
||||
response = self.client.chat.chat(messages=message_dicts, **params)
|
||||
|
||||
for chunk in response:
|
||||
delta = chunk["choices"][0].get("delta")
|
||||
if not delta or not delta.get("content"):
|
||||
delta = chunk.choices[0].delta
|
||||
if not delta or not delta.content:
|
||||
continue
|
||||
chunk = _convert_dict_to_message(
|
||||
{"role": "assistant", "content": delta["content"]}
|
||||
chunk = self._convert_writer_to_langchain(
|
||||
{"role": "assistant", "content": delta.content}
|
||||
)
|
||||
chunk = ChatGenerationChunk(message=chunk)
|
||||
|
||||
@@ -223,17 +293,17 @@ class ChatWriter(BaseChatModel):
|
||||
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> AsyncIterator[ChatGenerationChunk]:
|
||||
message_dicts, params = self._convert_messages_to_dicts(messages, stop)
|
||||
message_dicts, params = self._convert_messages_to_writer(messages, stop)
|
||||
params = {**params, **kwargs, "stream": True}
|
||||
|
||||
response = await self.async_client.chat.chat(messages=message_dicts, **params)
|
||||
|
||||
async for chunk in response:
|
||||
delta = chunk["choices"][0].get("delta")
|
||||
if not delta or not delta.get("content"):
|
||||
delta = chunk.choices[0].delta
|
||||
if not delta or not delta.content:
|
||||
continue
|
||||
chunk = _convert_dict_to_message(
|
||||
{"role": "assistant", "content": delta["content"]}
|
||||
chunk = self._convert_writer_to_langchain(
|
||||
{"role": "assistant", "content": delta.content}
|
||||
)
|
||||
chunk = ChatGenerationChunk(message=chunk)
|
||||
|
||||
@@ -249,12 +319,7 @@ class ChatWriter(BaseChatModel):
|
||||
run_manager: Optional[CallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> ChatResult:
|
||||
if self.streaming:
|
||||
return generate_from_stream(
|
||||
self._stream(messages, stop, run_manager, **kwargs)
|
||||
)
|
||||
|
||||
message_dicts, params = self._convert_messages_to_dicts(messages, stop)
|
||||
message_dicts, params = self._convert_messages_to_writer(messages, stop)
|
||||
params = {**params, **kwargs}
|
||||
response = self.client.chat.chat(messages=message_dicts, **params)
|
||||
return self._create_chat_result(response)
|
||||
@@ -266,28 +331,11 @@ class ChatWriter(BaseChatModel):
|
||||
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> ChatResult:
|
||||
if self.streaming:
|
||||
return await agenerate_from_stream(
|
||||
self._astream(messages, stop, run_manager, **kwargs)
|
||||
)
|
||||
|
||||
message_dicts, params = self._convert_messages_to_dicts(messages, stop)
|
||||
message_dicts, params = self._convert_messages_to_writer(messages, stop)
|
||||
params = {**params, **kwargs}
|
||||
response = await self.async_client.chat.chat(messages=message_dicts, **params)
|
||||
return self._create_chat_result(response)
|
||||
|
||||
@property
|
||||
def _default_params(self) -> Dict[str, Any]:
|
||||
"""Get the default parameters for calling Writer API."""
|
||||
return {
|
||||
"model": self.model_name,
|
||||
"temperature": self.temperature,
|
||||
"stream": self.streaming,
|
||||
"n": self.n,
|
||||
"max_tokens": self.max_tokens,
|
||||
**self.model_kwargs,
|
||||
}
|
||||
|
||||
def bind_tools(
|
||||
self,
|
||||
tools: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable]],
|
||||
|
@@ -1,108 +1,89 @@
|
||||
from typing import Any, Dict, List, Mapping, Optional
|
||||
from typing import Any, AsyncIterator, Dict, Iterator, List, Mapping, Optional
|
||||
|
||||
import requests
|
||||
from langchain_core.callbacks import CallbackManagerForLLMRun
|
||||
from langchain_core.callbacks import (
|
||||
AsyncCallbackManagerForLLMRun,
|
||||
CallbackManagerForLLMRun,
|
||||
)
|
||||
from langchain_core.language_models.llms import LLM
|
||||
from langchain_core.utils import get_from_dict_or_env, pre_init
|
||||
from pydantic import ConfigDict
|
||||
|
||||
from langchain_community.llms.utils import enforce_stop_tokens
|
||||
from langchain_core.outputs import GenerationChunk
|
||||
from langchain_core.utils import get_from_dict_or_env
|
||||
from pydantic import ConfigDict, Field, SecretStr, model_validator
|
||||
|
||||
|
||||
class Writer(LLM):
|
||||
"""Writer large language models.
|
||||
|
||||
To use, you should have the environment variable ``WRITER_API_KEY`` and
|
||||
``WRITER_ORG_ID`` set with your API key and organization ID respectively.
|
||||
To use, you should have the ``writer-sdk`` Python package installed, and the
|
||||
environment variable ``WRITER_API_KEY`` set with your API key.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
from langchain_community.llms import Writer
|
||||
writer = Writer(model_id="palmyra-base")
|
||||
from langchain_community.llms import Writer as WriterLLM
|
||||
from writerai import Writer, AsyncWriter
|
||||
|
||||
client = Writer()
|
||||
async_client = AsyncWriter()
|
||||
|
||||
chat = WriterLLM(
|
||||
client=client,
|
||||
async_client=async_client
|
||||
)
|
||||
"""
|
||||
|
||||
writer_org_id: Optional[str] = None
|
||||
"""Writer organization ID."""
|
||||
client: Any = Field(default=None, exclude=True) #: :meta private:
|
||||
async_client: Any = Field(default=None, exclude=True) #: :meta private:
|
||||
|
||||
model_id: str = "palmyra-instruct"
|
||||
"""Model name to use."""
|
||||
|
||||
min_tokens: Optional[int] = None
|
||||
"""Minimum number of tokens to generate."""
|
||||
|
||||
max_tokens: Optional[int] = None
|
||||
"""Maximum number of tokens to generate."""
|
||||
|
||||
temperature: Optional[float] = None
|
||||
"""What sampling temperature to use."""
|
||||
|
||||
top_p: Optional[float] = None
|
||||
"""Total probability mass of tokens to consider at each step."""
|
||||
|
||||
stop: Optional[List[str]] = None
|
||||
"""Sequences when completion generation will stop."""
|
||||
|
||||
presence_penalty: Optional[float] = None
|
||||
"""Penalizes repeated tokens regardless of frequency."""
|
||||
|
||||
repetition_penalty: Optional[float] = None
|
||||
"""Penalizes repeated tokens according to frequency."""
|
||||
|
||||
best_of: Optional[int] = None
|
||||
"""Generates this many completions server-side and returns the "best"."""
|
||||
|
||||
logprobs: bool = False
|
||||
"""Whether to return log probabilities."""
|
||||
|
||||
n: Optional[int] = None
|
||||
"""How many completions to generate."""
|
||||
|
||||
writer_api_key: Optional[str] = None
|
||||
api_key: Optional[SecretStr] = Field(default=None)
|
||||
"""Writer API key."""
|
||||
|
||||
base_url: Optional[str] = None
|
||||
"""Base url to use, if None decides based on model name."""
|
||||
model_name: str = Field(default="palmyra-x-003-instruct", alias="model")
|
||||
"""Model name to use."""
|
||||
|
||||
model_config = ConfigDict(
|
||||
extra="forbid",
|
||||
)
|
||||
max_tokens: Optional[int] = None
|
||||
"""The maximum number of tokens that the model can generate in the response."""
|
||||
|
||||
@pre_init
|
||||
def validate_environment(cls, values: Dict) -> Dict:
|
||||
"""Validate that api key and organization id exist in environment."""
|
||||
temperature: Optional[float] = 0.7
|
||||
"""Controls the randomness of the model's outputs. Higher values lead to more
|
||||
random outputs, while lower values make the model more deterministic."""
|
||||
|
||||
writer_api_key = get_from_dict_or_env(
|
||||
values, "writer_api_key", "WRITER_API_KEY"
|
||||
)
|
||||
values["writer_api_key"] = writer_api_key
|
||||
top_p: Optional[float] = None
|
||||
"""Used to control the nucleus sampling, where only the most probable tokens
|
||||
with a cumulative probability of top_p are considered for sampling, providing
|
||||
a way to fine-tune the randomness of predictions."""
|
||||
|
||||
writer_org_id = get_from_dict_or_env(values, "writer_org_id", "WRITER_ORG_ID")
|
||||
values["writer_org_id"] = writer_org_id
|
||||
stop: Optional[List[str]] = None
|
||||
"""Specifies stopping conditions for the model's output generation. This can
|
||||
be an array of strings or a single string that the model will look for as a
|
||||
signal to stop generating further tokens."""
|
||||
|
||||
return values
|
||||
best_of: Optional[int] = None
|
||||
"""Specifies the number of completions to generate and return the best one.
|
||||
Useful for generating multiple outputs and choosing the best based on some
|
||||
criteria."""
|
||||
|
||||
model_kwargs: Dict[str, Any] = Field(default_factory=dict)
|
||||
"""Holds any model parameters valid for `create` call not explicitly specified."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
@property
|
||||
def _default_params(self) -> Mapping[str, Any]:
|
||||
"""Get the default parameters for calling Writer API."""
|
||||
return {
|
||||
"minTokens": self.min_tokens,
|
||||
"maxTokens": self.max_tokens,
|
||||
"max_tokens": self.max_tokens,
|
||||
"temperature": self.temperature,
|
||||
"topP": self.top_p,
|
||||
"top_p": self.top_p,
|
||||
"stop": self.stop,
|
||||
"presencePenalty": self.presence_penalty,
|
||||
"repetitionPenalty": self.repetition_penalty,
|
||||
"bestOf": self.best_of,
|
||||
"logprobs": self.logprobs,
|
||||
"n": self.n,
|
||||
"best_of": self.best_of,
|
||||
**self.model_kwargs,
|
||||
}
|
||||
|
||||
@property
|
||||
def _identifying_params(self) -> Mapping[str, Any]:
|
||||
"""Get the identifying parameters."""
|
||||
return {
|
||||
**{"model_id": self.model_id, "writer_org_id": self.writer_org_id},
|
||||
"model": self.model_name,
|
||||
**self._default_params,
|
||||
}
|
||||
|
||||
@@ -111,6 +92,51 @@ class Writer(LLM):
|
||||
"""Return type of llm."""
|
||||
return "writer"
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_environment(cls, values: Dict) -> Any:
|
||||
"""Validates that api key is passed and creates Writer clients."""
|
||||
try:
|
||||
from writerai import AsyncClient, Client
|
||||
except ImportError as e:
|
||||
raise ImportError(
|
||||
"Could not import writerai python package. "
|
||||
"Please install it with `pip install writerai`."
|
||||
) from e
|
||||
|
||||
if not values.get("client"):
|
||||
values.update(
|
||||
{
|
||||
"client": Client(
|
||||
api_key=get_from_dict_or_env(
|
||||
values, "api_key", "WRITER_API_KEY"
|
||||
)
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
if not values.get("async_client"):
|
||||
values.update(
|
||||
{
|
||||
"async_client": AsyncClient(
|
||||
api_key=get_from_dict_or_env(
|
||||
values, "api_key", "WRITER_API_KEY"
|
||||
)
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
if not (
|
||||
type(values.get("client")) is Client
|
||||
and type(values.get("async_client")) is AsyncClient
|
||||
):
|
||||
raise ValueError(
|
||||
"'client' attribute must be with type 'Client' and "
|
||||
"'async_client' must be with type 'AsyncClient' from 'writerai' package"
|
||||
)
|
||||
|
||||
return values
|
||||
|
||||
def _call(
|
||||
self,
|
||||
prompt: str,
|
||||
@@ -118,41 +144,54 @@ class Writer(LLM):
|
||||
run_manager: Optional[CallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> str:
|
||||
"""Call out to Writer's completions endpoint.
|
||||
|
||||
Args:
|
||||
prompt: The prompt to pass into the model.
|
||||
stop: Optional list of stop words to use when generating.
|
||||
|
||||
Returns:
|
||||
The string generated by the model.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
response = Writer("Tell me a joke.")
|
||||
"""
|
||||
if self.base_url is not None:
|
||||
base_url = self.base_url
|
||||
else:
|
||||
base_url = (
|
||||
"https://enterprise-api.writer.com/llm"
|
||||
f"/organization/{self.writer_org_id}"
|
||||
f"/model/{self.model_id}/completions"
|
||||
)
|
||||
params = {**self._default_params, **kwargs}
|
||||
response = requests.post(
|
||||
url=base_url,
|
||||
headers={
|
||||
"Authorization": f"{self.writer_api_key}",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
},
|
||||
json={"prompt": prompt, **params},
|
||||
)
|
||||
text = response.text
|
||||
params = {**self._identifying_params, **kwargs}
|
||||
if stop is not None:
|
||||
# I believe this is required since the stop tokens
|
||||
# are not enforced by the model parameters
|
||||
text = enforce_stop_tokens(text, stop)
|
||||
params.update({"stop": stop})
|
||||
text = self.client.completions.create(prompt=prompt, **params).choices[0].text
|
||||
return text
|
||||
|
||||
async def _acall(
|
||||
self,
|
||||
prompt: str,
|
||||
stop: Optional[list[str]] = None,
|
||||
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> str:
|
||||
params = {**self._identifying_params, **kwargs}
|
||||
if stop is not None:
|
||||
params.update({"stop": stop})
|
||||
response = await self.async_client.completions.create(prompt=prompt, **params)
|
||||
text = response.choices[0].text
|
||||
return text
|
||||
|
||||
def _stream(
|
||||
self,
|
||||
prompt: str,
|
||||
stop: Optional[list[str]] = None,
|
||||
run_manager: Optional[CallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> Iterator[GenerationChunk]:
|
||||
params = {**self._identifying_params, **kwargs, "stream": True}
|
||||
if stop is not None:
|
||||
params.update({"stop": stop})
|
||||
response = self.client.completions.create(prompt=prompt, **params)
|
||||
for chunk in response:
|
||||
if run_manager:
|
||||
run_manager.on_llm_new_token(chunk.value)
|
||||
yield GenerationChunk(text=chunk.value)
|
||||
|
||||
async def _astream(
|
||||
self,
|
||||
prompt: str,
|
||||
stop: Optional[list[str]] = None,
|
||||
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> AsyncIterator[GenerationChunk]:
|
||||
params = {**self._identifying_params, **kwargs, "stream": True}
|
||||
if stop is not None:
|
||||
params.update({"stop": stop})
|
||||
response = await self.async_client.completions.create(prompt=prompt, **params)
|
||||
async for chunk in response:
|
||||
if run_manager:
|
||||
await run_manager.on_llm_new_token(chunk.value)
|
||||
yield GenerationChunk(text=chunk.value)
|
||||
|
@@ -20,7 +20,7 @@ count=$(git grep -E '(@root_validator)|(@validator)|(@field_validator)|(@pre_ini
|
||||
# PRs that increase the current count will not be accepted.
|
||||
# PRs that decrease update the code in the repository
|
||||
# and allow decreasing the count of are welcome!
|
||||
current_count=126
|
||||
current_count=125
|
||||
|
||||
if [ "$count" -gt "$current_count" ]; then
|
||||
echo "The PR seems to be introducing new usage of @root_validator and/or @field_validator."
|
||||
|
@@ -1,10 +0,0 @@
|
||||
"""Test Writer API wrapper."""
|
||||
|
||||
from langchain_community.llms.writer import Writer
|
||||
|
||||
|
||||
def test_writer_call() -> None:
|
||||
"""Test valid call to Writer."""
|
||||
llm = Writer()
|
||||
output = llm.invoke("Say foo:")
|
||||
assert isinstance(output, str)
|
@@ -1,61 +1,251 @@
|
||||
"""Unit tests for Writer chat model integration."""
|
||||
|
||||
import json
|
||||
from typing import Any, Dict, List
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from typing import Any, Dict, List, Literal, Optional, Tuple, Type
|
||||
from unittest import mock
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
from langchain_core.callbacks.manager import CallbackManager
|
||||
from langchain_core.language_models import BaseChatModel
|
||||
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
|
||||
from langchain_standard_tests.unit_tests import ChatModelUnitTests
|
||||
from pydantic import SecretStr
|
||||
|
||||
from langchain_community.chat_models.writer import ChatWriter, _convert_dict_to_message
|
||||
from langchain_community.chat_models.writer import ChatWriter
|
||||
from tests.unit_tests.callbacks.fake_callback_handler import FakeCallbackHandler
|
||||
|
||||
"""Classes for mocking Writer responses."""
|
||||
|
||||
|
||||
class ChoiceDelta:
|
||||
def __init__(self, content: str):
|
||||
self.content = content
|
||||
|
||||
|
||||
class ChunkChoice:
|
||||
def __init__(self, index: int, finish_reason: str, delta: ChoiceDelta):
|
||||
self.index = index
|
||||
self.finish_reason = finish_reason
|
||||
self.delta = delta
|
||||
|
||||
|
||||
class ChatCompletionChunk:
|
||||
def __init__(
|
||||
self,
|
||||
id: str,
|
||||
object: str,
|
||||
created: int,
|
||||
model: str,
|
||||
choices: List[ChunkChoice],
|
||||
):
|
||||
self.id = id
|
||||
self.object = object
|
||||
self.created = created
|
||||
self.model = model
|
||||
self.choices = choices
|
||||
|
||||
|
||||
class ToolCallFunction:
|
||||
def __init__(self, name: str, arguments: str):
|
||||
self.name = name
|
||||
self.arguments = arguments
|
||||
|
||||
|
||||
class ChoiceMessageToolCall:
|
||||
def __init__(self, id: str, type: str, function: ToolCallFunction):
|
||||
self.id = id
|
||||
self.type = type
|
||||
self.function = function
|
||||
|
||||
|
||||
class Usage:
|
||||
def __init__(
|
||||
self,
|
||||
prompt_tokens: int,
|
||||
completion_tokens: int,
|
||||
total_tokens: int,
|
||||
):
|
||||
self.prompt_tokens = prompt_tokens
|
||||
self.completion_tokens = completion_tokens
|
||||
self.total_tokens = total_tokens
|
||||
|
||||
|
||||
class ChoiceMessage:
|
||||
def __init__(
|
||||
self,
|
||||
role: str,
|
||||
content: str,
|
||||
tool_calls: Optional[List[ChoiceMessageToolCall]] = None,
|
||||
):
|
||||
self.role = role
|
||||
self.content = content
|
||||
self.tool_calls = tool_calls
|
||||
|
||||
|
||||
class Choice:
|
||||
def __init__(self, index: int, finish_reason: str, message: ChoiceMessage):
|
||||
self.index = index
|
||||
self.finish_reason = finish_reason
|
||||
self.message = message
|
||||
|
||||
|
||||
class Chat:
|
||||
def __init__(
|
||||
self,
|
||||
id: str,
|
||||
object: str,
|
||||
created: int,
|
||||
system_fingerprint: str,
|
||||
model: str,
|
||||
usage: Usage,
|
||||
choices: List[Choice],
|
||||
):
|
||||
self.id = id
|
||||
self.object = object
|
||||
self.created = created
|
||||
self.system_fingerprint = system_fingerprint
|
||||
self.model = model
|
||||
self.usage = usage
|
||||
self.choices = choices
|
||||
|
||||
|
||||
@pytest.mark.requires("writerai")
|
||||
class TestChatWriterCustom:
|
||||
"""Test case for ChatWriter"""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_unstreaming_completion(self) -> Chat:
|
||||
"""Fixture providing a mock API response."""
|
||||
return Chat(
|
||||
id="chat-12345",
|
||||
object="chat.completion",
|
||||
created=1699000000,
|
||||
model="palmyra-x-004",
|
||||
system_fingerprint="v1",
|
||||
usage=Usage(prompt_tokens=10, completion_tokens=8, total_tokens=18),
|
||||
choices=[
|
||||
Choice(
|
||||
index=0,
|
||||
finish_reason="stop",
|
||||
message=ChoiceMessage(
|
||||
role="assistant",
|
||||
content="Hello! How can I help you?",
|
||||
),
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_tool_call_choice_response(self) -> Chat:
|
||||
return Chat(
|
||||
id="chat-12345",
|
||||
object="chat.completion",
|
||||
created=1699000000,
|
||||
model="palmyra-x-004",
|
||||
system_fingerprint="v1",
|
||||
usage=Usage(prompt_tokens=29, completion_tokens=32, total_tokens=61),
|
||||
choices=[
|
||||
Choice(
|
||||
index=0,
|
||||
finish_reason="tool_calls",
|
||||
message=ChoiceMessage(
|
||||
role="assistant",
|
||||
content="",
|
||||
tool_calls=[
|
||||
ChoiceMessageToolCall(
|
||||
id="call_abc123",
|
||||
type="function",
|
||||
function=ToolCallFunction(
|
||||
name="GetWeather",
|
||||
arguments='{"location": "London"}',
|
||||
),
|
||||
)
|
||||
],
|
||||
),
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_streaming_chunks(self) -> List[ChatCompletionChunk]:
|
||||
"""Fixture providing mock streaming response chunks."""
|
||||
return [
|
||||
ChatCompletionChunk(
|
||||
id="chat-12345",
|
||||
object="chat.completion",
|
||||
created=1699000000,
|
||||
model="palmyra-x-004",
|
||||
choices=[
|
||||
ChunkChoice(
|
||||
index=0,
|
||||
finish_reason="stop",
|
||||
delta=ChoiceDelta(content="Hello! "),
|
||||
)
|
||||
],
|
||||
),
|
||||
ChatCompletionChunk(
|
||||
id="chat-12345",
|
||||
object="chat.completion",
|
||||
created=1699000000,
|
||||
model="palmyra-x-004",
|
||||
choices=[
|
||||
ChunkChoice(
|
||||
index=0,
|
||||
finish_reason="stop",
|
||||
delta=ChoiceDelta(content="How can I help you?"),
|
||||
)
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
class TestChatWriter:
|
||||
def test_writer_model_param(self) -> None:
|
||||
"""Test different ways to initialize the chat model."""
|
||||
test_cases: List[dict] = [
|
||||
{"model_name": "palmyra-x-004", "writer_api_key": "test-key"},
|
||||
{"model": "palmyra-x-004", "writer_api_key": "test-key"},
|
||||
{"model_name": "palmyra-x-004", "writer_api_key": "test-key"},
|
||||
{
|
||||
"model_name": "palmyra-x-004",
|
||||
"api_key": "key",
|
||||
},
|
||||
{
|
||||
"model": "palmyra-x-004",
|
||||
"api_key": "key",
|
||||
},
|
||||
{
|
||||
"model_name": "palmyra-x-004",
|
||||
"api_key": "key",
|
||||
},
|
||||
{
|
||||
"model": "palmyra-x-004",
|
||||
"writer_api_key": "test-key",
|
||||
"temperature": 0.5,
|
||||
"api_key": "key",
|
||||
},
|
||||
]
|
||||
|
||||
for case in test_cases:
|
||||
chat = ChatWriter(**case)
|
||||
assert chat.model_name == "palmyra-x-004"
|
||||
assert chat.writer_api_key
|
||||
assert chat.writer_api_key.get_secret_value() == "test-key"
|
||||
assert chat.temperature == (0.5 if "temperature" in case else 0.7)
|
||||
|
||||
def test_convert_dict_to_message_human(self) -> None:
|
||||
def test_convert_writer_to_langchain_human(self) -> None:
|
||||
"""Test converting a human message dict to a LangChain message."""
|
||||
message = {"role": "user", "content": "Hello"}
|
||||
result = _convert_dict_to_message(message)
|
||||
result = ChatWriter._convert_writer_to_langchain(message)
|
||||
assert isinstance(result, HumanMessage)
|
||||
assert result.content == "Hello"
|
||||
|
||||
def test_convert_dict_to_message_ai(self) -> None:
|
||||
def test_convert_writer_to_langchain_ai(self) -> None:
|
||||
"""Test converting an AI message dict to a LangChain message."""
|
||||
message = {"role": "assistant", "content": "Hello"}
|
||||
result = _convert_dict_to_message(message)
|
||||
result = ChatWriter._convert_writer_to_langchain(message)
|
||||
assert isinstance(result, AIMessage)
|
||||
assert result.content == "Hello"
|
||||
|
||||
def test_convert_dict_to_message_system(self) -> None:
|
||||
def test_convert_writer_to_langchain_system(self) -> None:
|
||||
"""Test converting a system message dict to a LangChain message."""
|
||||
message = {"role": "system", "content": "You are a helpful assistant"}
|
||||
result = _convert_dict_to_message(message)
|
||||
result = ChatWriter._convert_writer_to_langchain(message)
|
||||
assert isinstance(result, SystemMessage)
|
||||
assert result.content == "You are a helpful assistant"
|
||||
|
||||
def test_convert_dict_to_message_tool_call(self) -> None:
|
||||
def test_convert_writer_to_langchain_tool_call(self) -> None:
|
||||
"""Test converting a tool call message dict to a LangChain message."""
|
||||
content = json.dumps({"result": 42})
|
||||
message = {
|
||||
@@ -64,12 +254,12 @@ class TestChatWriter:
|
||||
"content": content,
|
||||
"tool_call_id": "call_abc123",
|
||||
}
|
||||
result = _convert_dict_to_message(message)
|
||||
result = ChatWriter._convert_writer_to_langchain(message)
|
||||
assert isinstance(result, ToolMessage)
|
||||
assert result.name == "get_number"
|
||||
assert result.content == content
|
||||
|
||||
def test_convert_dict_to_message_with_tool_calls(self) -> None:
|
||||
def test_convert_writer_to_langchain_with_tool_calls(self) -> None:
|
||||
"""Test converting an AIMessage with tool calls."""
|
||||
message = {
|
||||
"role": "assistant",
|
||||
@@ -85,131 +275,55 @@ class TestChatWriter:
|
||||
}
|
||||
],
|
||||
}
|
||||
result = _convert_dict_to_message(message)
|
||||
result = ChatWriter._convert_writer_to_langchain(message)
|
||||
assert isinstance(result, AIMessage)
|
||||
assert result.tool_calls
|
||||
assert len(result.tool_calls) == 1
|
||||
assert result.tool_calls[0]["name"] == "get_weather"
|
||||
assert result.tool_calls[0]["args"]["location"] == "London"
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_completion(self) -> Dict[str, Any]:
|
||||
"""Fixture providing a mock API response."""
|
||||
return {
|
||||
"id": "chat-12345",
|
||||
"object": "chat.completion",
|
||||
"created": 1699000000,
|
||||
"model": "palmyra-x-004",
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": "Hello! How can I help you?",
|
||||
},
|
||||
"finish_reason": "stop",
|
||||
}
|
||||
],
|
||||
"usage": {"prompt_tokens": 10, "completion_tokens": 8, "total_tokens": 18},
|
||||
}
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_response(self) -> Dict[str, Any]:
|
||||
response = {
|
||||
"id": "chat-12345",
|
||||
"choices": [
|
||||
{
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": "call_abc123",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "GetWeather",
|
||||
"arguments": '{"location": "London"}',
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
"finish_reason": "tool_calls",
|
||||
}
|
||||
],
|
||||
}
|
||||
return response
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_streaming_chunks(self) -> List[Dict[str, Any]]:
|
||||
"""Fixture providing mock streaming response chunks."""
|
||||
return [
|
||||
{
|
||||
"id": "chat-12345",
|
||||
"object": "chat.completion.chunk",
|
||||
"created": 1699000000,
|
||||
"model": "palmyra-x-004",
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"delta": {
|
||||
"role": "assistant",
|
||||
"content": "Hello",
|
||||
},
|
||||
"finish_reason": None,
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "chat-12345",
|
||||
"object": "chat.completion.chunk",
|
||||
"created": 1699000000,
|
||||
"model": "palmyra-x-004",
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"delta": {
|
||||
"content": "!",
|
||||
},
|
||||
"finish_reason": "stop",
|
||||
}
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
def test_sync_completion(self, mock_completion: Dict[str, Any]) -> None:
|
||||
def test_sync_completion(
|
||||
self, mock_unstreaming_completion: List[ChatCompletionChunk]
|
||||
) -> None:
|
||||
"""Test basic chat completion with mocked response."""
|
||||
chat = ChatWriter(api_key=SecretStr("test-key"))
|
||||
mock_client = MagicMock()
|
||||
mock_client.chat.chat.return_value = mock_completion
|
||||
chat = ChatWriter(api_key=SecretStr("key"))
|
||||
|
||||
with patch.object(chat, "client", mock_client):
|
||||
mock_client = MagicMock()
|
||||
mock_client.chat.chat.return_value = mock_unstreaming_completion
|
||||
|
||||
with mock.patch.object(chat, "client", mock_client):
|
||||
message = HumanMessage(content="Hi there!")
|
||||
response = chat.invoke([message])
|
||||
assert isinstance(response, AIMessage)
|
||||
assert response.content == "Hello! How can I help you?"
|
||||
|
||||
async def test_async_completion(self, mock_completion: Dict[str, Any]) -> None:
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_completion(
|
||||
self, mock_unstreaming_completion: List[ChatCompletionChunk]
|
||||
) -> None:
|
||||
"""Test async chat completion with mocked response."""
|
||||
chat = ChatWriter(api_key=SecretStr("test-key"))
|
||||
mock_client = AsyncMock()
|
||||
mock_client.chat.chat.return_value = mock_completion
|
||||
chat = ChatWriter(api_key=SecretStr("key"))
|
||||
|
||||
with patch.object(chat, "async_client", mock_client):
|
||||
mock_async_client = AsyncMock()
|
||||
mock_async_client.chat.chat.return_value = mock_unstreaming_completion
|
||||
|
||||
with mock.patch.object(chat, "async_client", mock_async_client):
|
||||
message = HumanMessage(content="Hi there!")
|
||||
response = await chat.ainvoke([message])
|
||||
assert isinstance(response, AIMessage)
|
||||
assert response.content == "Hello! How can I help you?"
|
||||
|
||||
def test_sync_streaming(self, mock_streaming_chunks: List[Dict[str, Any]]) -> None:
|
||||
def test_sync_streaming(
|
||||
self, mock_streaming_chunks: List[ChatCompletionChunk]
|
||||
) -> None:
|
||||
"""Test sync streaming with callback handler."""
|
||||
callback_handler = FakeCallbackHandler()
|
||||
callback_manager = CallbackManager([callback_handler])
|
||||
|
||||
chat = ChatWriter(
|
||||
streaming=True,
|
||||
api_key=SecretStr("key"),
|
||||
callback_manager=callback_manager,
|
||||
max_tokens=10,
|
||||
api_key=SecretStr("test-key"),
|
||||
)
|
||||
|
||||
mock_client = MagicMock()
|
||||
@@ -217,42 +331,46 @@ class TestChatWriter:
|
||||
mock_response.__iter__.return_value = mock_streaming_chunks
|
||||
mock_client.chat.chat.return_value = mock_response
|
||||
|
||||
with patch.object(chat, "client", mock_client):
|
||||
with mock.patch.object(chat, "client", mock_client):
|
||||
message = HumanMessage(content="Hi")
|
||||
response = chat.invoke([message])
|
||||
|
||||
assert isinstance(response, AIMessage)
|
||||
response = chat.stream([message])
|
||||
response_message = ""
|
||||
for chunk in response:
|
||||
response_message += str(chunk.content)
|
||||
assert callback_handler.llm_streams > 0
|
||||
assert response.content == "Hello!"
|
||||
assert response_message == "Hello! How can I help you?"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_streaming(
|
||||
self, mock_streaming_chunks: List[Dict[str, Any]]
|
||||
self, mock_streaming_chunks: List[ChatCompletionChunk]
|
||||
) -> None:
|
||||
"""Test async streaming with callback handler."""
|
||||
callback_handler = FakeCallbackHandler()
|
||||
callback_manager = CallbackManager([callback_handler])
|
||||
|
||||
chat = ChatWriter(
|
||||
streaming=True,
|
||||
api_key=SecretStr("key"),
|
||||
callback_manager=callback_manager,
|
||||
max_tokens=10,
|
||||
api_key=SecretStr("test-key"),
|
||||
)
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_async_client = AsyncMock()
|
||||
mock_response = AsyncMock()
|
||||
mock_response.__aiter__.return_value = mock_streaming_chunks
|
||||
mock_client.chat.chat.return_value = mock_response
|
||||
mock_async_client.chat.chat.return_value = mock_response
|
||||
|
||||
with patch.object(chat, "async_client", mock_client):
|
||||
with mock.patch.object(chat, "async_client", mock_async_client):
|
||||
message = HumanMessage(content="Hi")
|
||||
response = await chat.ainvoke([message])
|
||||
|
||||
assert isinstance(response, AIMessage)
|
||||
response = chat.astream([message])
|
||||
response_message = ""
|
||||
async for chunk in response:
|
||||
response_message += str(chunk.content)
|
||||
assert callback_handler.llm_streams > 0
|
||||
assert response.content == "Hello!"
|
||||
assert response_message == "Hello! How can I help you?"
|
||||
|
||||
def test_sync_tool_calling(self, mock_response: Dict[str, Any]) -> None:
|
||||
def test_sync_tool_calling(
|
||||
self, mock_tool_call_choice_response: Dict[str, Any]
|
||||
) -> None:
|
||||
"""Test synchronous tool calling functionality."""
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
@@ -261,23 +379,27 @@ class TestChatWriter:
|
||||
|
||||
location: str = Field(..., description="The location to get weather for")
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.chat.chat.return_value = mock_response
|
||||
chat = ChatWriter(api_key=SecretStr("key"))
|
||||
|
||||
chat = ChatWriter(api_key=SecretStr("test-key"), client=mock_client)
|
||||
mock_client = MagicMock()
|
||||
mock_client.chat.chat.return_value = mock_tool_call_choice_response
|
||||
|
||||
chat_with_tools = chat.bind_tools(
|
||||
tools=[GetWeather],
|
||||
tool_choice="GetWeather",
|
||||
)
|
||||
|
||||
response = chat_with_tools.invoke("What's the weather in London?")
|
||||
assert isinstance(response, AIMessage)
|
||||
assert response.tool_calls
|
||||
assert response.tool_calls[0]["name"] == "GetWeather"
|
||||
assert response.tool_calls[0]["args"]["location"] == "London"
|
||||
with mock.patch.object(chat, "client", mock_client):
|
||||
response = chat_with_tools.invoke("What's the weather in London?")
|
||||
assert isinstance(response, AIMessage)
|
||||
assert response.tool_calls
|
||||
assert response.tool_calls[0]["name"] == "GetWeather"
|
||||
assert response.tool_calls[0]["args"]["location"] == "London"
|
||||
|
||||
async def test_async_tool_calling(self, mock_response: Dict[str, Any]) -> None:
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_tool_calling(
|
||||
self, mock_tool_call_choice_response: Dict[str, Any]
|
||||
) -> None:
|
||||
"""Test asynchronous tool calling functionality."""
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
@@ -286,18 +408,101 @@ class TestChatWriter:
|
||||
|
||||
location: str = Field(..., description="The location to get weather for")
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.chat.chat.return_value = mock_response
|
||||
mock_async_client = AsyncMock()
|
||||
mock_async_client.chat.chat.return_value = mock_tool_call_choice_response
|
||||
|
||||
chat = ChatWriter(api_key=SecretStr("test-key"), async_client=mock_client)
|
||||
chat = ChatWriter(api_key=SecretStr("key"))
|
||||
|
||||
chat_with_tools = chat.bind_tools(
|
||||
tools=[GetWeather],
|
||||
tool_choice="GetWeather",
|
||||
)
|
||||
|
||||
response = await chat_with_tools.ainvoke("What's the weather in London?")
|
||||
assert isinstance(response, AIMessage)
|
||||
assert response.tool_calls
|
||||
assert response.tool_calls[0]["name"] == "GetWeather"
|
||||
assert response.tool_calls[0]["args"]["location"] == "London"
|
||||
with mock.patch.object(chat, "async_client", mock_async_client):
|
||||
response = await chat_with_tools.ainvoke("What's the weather in London?")
|
||||
assert isinstance(response, AIMessage)
|
||||
assert response.tool_calls
|
||||
assert response.tool_calls[0]["name"] == "GetWeather"
|
||||
assert response.tool_calls[0]["args"]["location"] == "London"
|
||||
|
||||
|
||||
@pytest.mark.requires("writerai")
|
||||
class TestChatWriterStandart(ChatModelUnitTests):
|
||||
"""Test case for ChatWriter that inherits from standard LangChain tests."""
|
||||
|
||||
@property
|
||||
def chat_model_class(self) -> Type[BaseChatModel]:
|
||||
"""Return ChatWriter model class."""
|
||||
return ChatWriter
|
||||
|
||||
@property
|
||||
def chat_model_params(self) -> Dict:
|
||||
"""Return any additional parameters needed."""
|
||||
return {
|
||||
"api_key": "fake-api-key",
|
||||
"model_name": "palmyra-x-004",
|
||||
}
|
||||
|
||||
@property
|
||||
def has_tool_calling(self) -> bool:
|
||||
"""Writer supports tool/function calling."""
|
||||
return True
|
||||
|
||||
@property
|
||||
def tool_choice_value(self) -> Optional[str]:
|
||||
"""Value to use for tool choice in tests."""
|
||||
return "auto"
|
||||
|
||||
@property
|
||||
def has_structured_output(self) -> bool:
|
||||
"""Writer does not yet support structured output."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def supports_image_inputs(self) -> bool:
|
||||
"""Writer does not support image inputs."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def supports_video_inputs(self) -> bool:
|
||||
"""Writer does not support video inputs."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def returns_usage_metadata(self) -> bool:
|
||||
"""Writer returns token usage information."""
|
||||
return True
|
||||
|
||||
@property
|
||||
def supports_anthropic_inputs(self) -> bool:
|
||||
"""Writer does not support anthropic inputs."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def supports_image_tool_message(self) -> bool:
|
||||
"""Writer does not support image tool message."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def supported_usage_metadata_details(
|
||||
self,
|
||||
) -> Dict[
|
||||
Literal["invoke", "stream"],
|
||||
List[
|
||||
Literal[
|
||||
"audio_input",
|
||||
"audio_output",
|
||||
"reasoning_output",
|
||||
"cache_read_input",
|
||||
"cache_creation_input",
|
||||
]
|
||||
],
|
||||
]:
|
||||
"""Return which types of usage metadata your model supports."""
|
||||
return {"invoke": ["cache_creation_input"], "stream": ["reasoning_output"]}
|
||||
|
||||
@property
|
||||
def init_from_env_params(self) -> Tuple[dict, dict, dict]:
|
||||
"""Return env vars, init args, and expected instance attrs for initializing
|
||||
from env vars."""
|
||||
return {"WRITER_API_KEY": "key"}, {"api_key": "key"}, {"api_key": "key"}
|
||||
|
202
libs/community/tests/unit_tests/llms/test_writer.py
Normal file
202
libs/community/tests/unit_tests/llms/test_writer.py
Normal file
@@ -0,0 +1,202 @@
|
||||
from typing import List
|
||||
from unittest import mock
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
from langchain_core.callbacks import CallbackManager
|
||||
from pydantic import SecretStr
|
||||
|
||||
from langchain_community.llms.writer import Writer
|
||||
from tests.unit_tests.callbacks.fake_callback_handler import FakeCallbackHandler
|
||||
|
||||
"""Classes for mocking Writer responses."""
|
||||
|
||||
|
||||
class Choice:
|
||||
def __init__(self, text: str):
|
||||
self.text = text
|
||||
|
||||
|
||||
class Completion:
|
||||
def __init__(self, choices: List[Choice]):
|
||||
self.choices = choices
|
||||
|
||||
|
||||
class StreamingData:
|
||||
def __init__(self, value: str):
|
||||
self.value = value
|
||||
|
||||
|
||||
@pytest.mark.requires("writerai")
|
||||
class TestWriterLLM:
|
||||
"""Unit tests for Writer LLM integration."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_unstreaming_completion(self) -> Completion:
|
||||
"""Fixture providing a mock API response."""
|
||||
return Completion(choices=[Choice(text="Hello! How can I help you?")])
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_streaming_completion(self) -> List[StreamingData]:
|
||||
"""Fixture providing mock streaming response chunks."""
|
||||
return [
|
||||
StreamingData(value="Hello! "),
|
||||
StreamingData(value="How can I"),
|
||||
StreamingData(value=" help you?"),
|
||||
]
|
||||
|
||||
def test_sync_unstream_completion(
|
||||
self, mock_unstreaming_completion: Completion
|
||||
) -> None:
|
||||
"""Test basic llm call with mocked response."""
|
||||
mock_client = MagicMock()
|
||||
mock_client.completions.create.return_value = mock_unstreaming_completion
|
||||
|
||||
llm = Writer(api_key=SecretStr("key"))
|
||||
|
||||
with mock.patch.object(llm, "client", mock_client):
|
||||
response_text = llm.invoke(input="Hello")
|
||||
|
||||
assert response_text == "Hello! How can I help you?"
|
||||
|
||||
def test_sync_unstream_completion_with_params(
|
||||
self, mock_unstreaming_completion: Completion
|
||||
) -> None:
|
||||
"""Test llm call with passed params with mocked response."""
|
||||
mock_client = MagicMock()
|
||||
mock_client.completions.create.return_value = mock_unstreaming_completion
|
||||
|
||||
llm = Writer(api_key=SecretStr("key"), temperature=1)
|
||||
|
||||
with mock.patch.object(llm, "client", mock_client):
|
||||
response_text = llm.invoke(input="Hello")
|
||||
|
||||
assert response_text == "Hello! How can I help you?"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_unstream_completion(
|
||||
self, mock_unstreaming_completion: Completion
|
||||
) -> None:
|
||||
"""Test async chat completion with mocked response."""
|
||||
mock_async_client = AsyncMock()
|
||||
mock_async_client.completions.create.return_value = mock_unstreaming_completion
|
||||
|
||||
llm = Writer(api_key=SecretStr("key"))
|
||||
|
||||
with mock.patch.object(llm, "async_client", mock_async_client):
|
||||
response_text = await llm.ainvoke(input="Hello")
|
||||
|
||||
assert response_text == "Hello! How can I help you?"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_unstream_completion_with_params(
|
||||
self, mock_unstreaming_completion: Completion
|
||||
) -> None:
|
||||
"""Test async llm call with passed params with mocked response."""
|
||||
mock_async_client = AsyncMock()
|
||||
mock_async_client.completions.create.return_value = mock_unstreaming_completion
|
||||
|
||||
llm = Writer(api_key=SecretStr("key"), temperature=1)
|
||||
|
||||
with mock.patch.object(llm, "async_client", mock_async_client):
|
||||
response_text = await llm.ainvoke(input="Hello")
|
||||
|
||||
assert response_text == "Hello! How can I help you?"
|
||||
|
||||
def test_sync_streaming_completion(
|
||||
self, mock_streaming_completion: List[StreamingData]
|
||||
) -> None:
|
||||
"""Test sync streaming."""
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.__iter__.return_value = mock_streaming_completion
|
||||
mock_client.completions.create.return_value = mock_response
|
||||
|
||||
llm = Writer(api_key=SecretStr("key"))
|
||||
|
||||
with mock.patch.object(llm, "client", mock_client):
|
||||
response = llm.stream(input="Hello")
|
||||
|
||||
response_message = ""
|
||||
for chunk in response:
|
||||
response_message += chunk
|
||||
|
||||
assert response_message == "Hello! How can I help you?"
|
||||
|
||||
def test_sync_streaming_completion_with_callback_handler(
|
||||
self, mock_streaming_completion: List[StreamingData]
|
||||
) -> None:
|
||||
"""Test sync streaming with callback handler."""
|
||||
callback_handler = FakeCallbackHandler()
|
||||
callback_manager = CallbackManager([callback_handler])
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.__iter__.return_value = mock_streaming_completion
|
||||
mock_client.completions.create.return_value = mock_response
|
||||
|
||||
llm = Writer(
|
||||
api_key=SecretStr("key"),
|
||||
callback_manager=callback_manager,
|
||||
)
|
||||
|
||||
with mock.patch.object(llm, "client", mock_client):
|
||||
response = llm.stream(input="Hello")
|
||||
|
||||
response_message = ""
|
||||
for chunk in response:
|
||||
response_message += chunk
|
||||
|
||||
assert callback_handler.llm_streams == 3
|
||||
assert response_message == "Hello! How can I help you?"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_streaming_completion(
|
||||
self, mock_streaming_completion: Completion
|
||||
) -> None:
|
||||
"""Test async streaming with callback handler."""
|
||||
|
||||
mock_async_client = AsyncMock()
|
||||
mock_response = AsyncMock()
|
||||
mock_response.__aiter__.return_value = mock_streaming_completion
|
||||
mock_async_client.completions.create.return_value = mock_response
|
||||
|
||||
llm = Writer(api_key=SecretStr("key"))
|
||||
|
||||
with mock.patch.object(llm, "async_client", mock_async_client):
|
||||
response = llm.astream(input="Hello")
|
||||
|
||||
response_message = ""
|
||||
async for chunk in response:
|
||||
response_message += str(chunk)
|
||||
|
||||
assert response_message == "Hello! How can I help you?"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_streaming_completion_with_callback_handler(
|
||||
self, mock_streaming_completion: Completion
|
||||
) -> None:
|
||||
"""Test async streaming with callback handler."""
|
||||
callback_handler = FakeCallbackHandler()
|
||||
callback_manager = CallbackManager([callback_handler])
|
||||
|
||||
mock_async_client = AsyncMock()
|
||||
mock_response = AsyncMock()
|
||||
mock_response.__aiter__.return_value = mock_streaming_completion
|
||||
mock_async_client.completions.create.return_value = mock_response
|
||||
|
||||
llm = Writer(
|
||||
api_key=SecretStr("key"),
|
||||
callback_manager=callback_manager,
|
||||
)
|
||||
|
||||
with mock.patch.object(llm, "async_client", mock_async_client):
|
||||
response = llm.astream(input="Hello")
|
||||
|
||||
response_message = ""
|
||||
async for chunk in response:
|
||||
response_message += str(chunk)
|
||||
|
||||
assert callback_handler.llm_streams == 3
|
||||
assert response_message == "Hello! How can I help you?"
|
Reference in New Issue
Block a user