langchain/libs/community/langchain_community/chat_models/naver.py
CLOVA Studio 개발 846a75284f
community: Add Naver chat model & embeddings (#25162)
Reopened as a personal repo outside the organization.

## Description
- Naver HyperCLOVA X community package 
  - Add chat model & embeddings
  - Add unit test & integration test
  - Add chat model & embeddings docs
- I changed partner
package(https://github.com/langchain-ai/langchain/pull/24252) to
community package on this PR
- Could this
embeddings(https://github.com/langchain-ai/langchain/pull/21890) be
deprecated? We are trying to replace it with embedding
model(**ClovaXEmbeddings**) in this PR.

Twitter handle: None. (if needed, contact with
joonha.jeon@navercorp.com)

---
you can check our previous discussion below:

> one question on namespaces - would it make sense to have these in
.clova namespaces instead of .naver?

I would like to keep it as is, unless it is essential to unify the
package name.
(ClovaX is a branding for the model, and I plan to add other models and
components. They need to be managed as separate classes.)

> also, could you clarify the difference between ClovaEmbeddings and
ClovaXEmbeddings?

There are 3 models that are being serviced by embedding, and all are
supported in the current PR. In addition, all the functionality of CLOVA
Studio that serves actual models, such as distinguishing between test
apps and service apps, is supported. The existing PR does not support
this content because it is hard-coded.

---------

Co-authored-by: Erick Friis <erick@langchain.dev>
Co-authored-by: Vadym Barda <vadym@langchain.dev>
2024-10-24 20:54:13 +00:00

525 lines
18 KiB
Python

import logging
from typing import (
Any,
AsyncContextManager,
AsyncIterator,
Callable,
Dict,
Iterator,
List,
Optional,
Tuple,
Type,
Union,
cast,
)
import httpx
from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.chat_models import BaseChatModel, LangSmithParams
from langchain_core.language_models.llms import create_base_retry_decorator
from langchain_core.messages import (
AIMessage,
AIMessageChunk,
BaseMessage,
BaseMessageChunk,
ChatMessage,
ChatMessageChunk,
HumanMessage,
HumanMessageChunk,
SystemMessage,
SystemMessageChunk,
)
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
from langchain_core.utils import convert_to_secret_str, get_from_env
from pydantic import AliasChoices, Field, SecretStr, model_validator
from typing_extensions import Self
_DEFAULT_BASE_URL = "https://clovastudio.stream.ntruss.com"
logger = logging.getLogger(__name__)
def _convert_chunk_to_message_chunk(
sse: Any, default_class: Type[BaseMessageChunk]
) -> BaseMessageChunk:
sse_data = sse.json()
message = sse_data.get("message")
role = message.get("role")
content = message.get("content") or ""
if role == "user" or default_class == HumanMessageChunk:
return HumanMessageChunk(content=content)
elif role == "assistant" or default_class == AIMessageChunk:
return AIMessageChunk(content=content)
elif role == "system" or default_class == SystemMessageChunk:
return SystemMessageChunk(content=content)
elif role or default_class == ChatMessageChunk:
return ChatMessageChunk(content=content, role=role)
else:
return default_class(content=content) # type: ignore[call-arg]
def _convert_message_to_naver_chat_message(
message: BaseMessage,
) -> Dict:
if isinstance(message, ChatMessage):
return dict(role=message.role, content=message.content)
elif isinstance(message, HumanMessage):
return dict(role="user", content=message.content)
elif isinstance(message, SystemMessage):
return dict(role="system", content=message.content)
elif isinstance(message, AIMessage):
return dict(role="assistant", content=message.content)
else:
logger.warning(
"FunctionMessage, ToolMessage not yet supported "
"(https://api.ncloud-docs.com/docs/clovastudio-chatcompletions)"
)
raise ValueError(f"Got unknown type {message}")
def _convert_naver_chat_message_to_message(
_message: Dict,
) -> BaseMessage:
role = _message["role"]
assert role in (
"assistant",
"system",
"user",
), f"Expected role to be 'assistant', 'system', 'user', got {role}"
content = cast(str, _message["content"])
additional_kwargs: Dict = {}
if role == "user":
return HumanMessage(
content=content,
additional_kwargs=additional_kwargs,
)
elif role == "system":
return SystemMessage(
content=content,
additional_kwargs=additional_kwargs,
)
elif role == "assistant":
return AIMessage(
content=content,
additional_kwargs=additional_kwargs,
)
else:
logger.warning("Got unknown role %s", role)
raise ValueError(f"Got unknown role {role}")
async def _aiter_sse(
event_source_mgr: AsyncContextManager[Any],
) -> AsyncIterator[Dict]:
"""Iterate over the server-sent events."""
async with event_source_mgr as event_source:
await _araise_on_error(event_source.response)
async for sse in event_source.aiter_sse():
event_data = sse.json()
if sse.event == "signal" and event_data.get("data", {}) == "[DONE]":
return
if sse.event == "result":
return
yield sse
def _raise_on_error(response: httpx.Response) -> None:
"""Raise an error if the response is an error."""
if httpx.codes.is_error(response.status_code):
error_message = response.read().decode("utf-8")
raise httpx.HTTPStatusError(
f"Error response {response.status_code} "
f"while fetching {response.url}: {error_message}",
request=response.request,
response=response,
)
async def _araise_on_error(response: httpx.Response) -> None:
"""Raise an error if the response is an error."""
if httpx.codes.is_error(response.status_code):
error_message = (await response.aread()).decode("utf-8")
raise httpx.HTTPStatusError(
f"Error response {response.status_code} "
f"while fetching {response.url}: {error_message}",
request=response.request,
response=response,
)
class ChatClovaX(BaseChatModel):
"""`NCP ClovaStudio` Chat Completion API.
following environment variables set or passed in constructor in lower case:
- ``NCP_CLOVASTUDIO_API_KEY``
- ``NCP_APIGW_API_KEY``
Example:
.. code-block:: python
from langchain_core.messages import HumanMessage
from langchain_community import ChatClovaX
model = ChatClovaX()
model.invoke([HumanMessage(content="Come up with 10 names for a song about parrots.")])
""" # noqa: E501
client: httpx.Client = Field(default=None) #: :meta private:
async_client: httpx.AsyncClient = Field(default=None) #: :meta private:
model_name: str = Field(
default="HCX-003",
validation_alias=AliasChoices("model_name", "model"),
description="NCP ClovaStudio chat model name",
)
task_id: Optional[str] = Field(
default=None, description="NCP Clova Studio chat model tuning task ID"
)
service_app: bool = Field(
default=False,
description="false: use testapp, true: use service app on NCP Clova Studio",
)
ncp_clovastudio_api_key: Optional[SecretStr] = Field(default=None, alias="api_key")
"""Automatically inferred from env are `NCP_CLOVASTUDIO_API_KEY` if not provided."""
ncp_apigw_api_key: Optional[SecretStr] = Field(default=None, alias="apigw_api_key")
"""Automatically inferred from env are `NCP_APIGW_API_KEY` if not provided."""
base_url: str = Field(default=None, alias="base_url")
"""
Automatically inferred from env are `NCP_CLOVASTUDIO_API_BASE_URL` if not provided.
"""
temperature: Optional[float] = Field(gt=0.0, le=1.0, default=0.5)
top_k: Optional[int] = Field(ge=0, le=128, default=0)
top_p: Optional[float] = Field(ge=0, le=1.0, default=0.8)
repeat_penalty: Optional[float] = Field(gt=0.0, le=10, default=5.0)
max_tokens: Optional[int] = Field(ge=0, le=4096, default=100)
stop_before: Optional[list[str]] = Field(default=None, alias="stop")
include_ai_filters: Optional[bool] = Field(default=False)
seed: Optional[int] = Field(ge=0, le=4294967295, default=0)
timeout: int = Field(gt=0, default=90)
max_retries: int = Field(ge=1, default=2)
class Config:
"""Configuration for this pydantic object."""
populate_by_name = True
@property
def _default_params(self) -> Dict[str, Any]:
"""Get the default parameters for calling the API."""
defaults = {
"temperature": self.temperature,
"topK": self.top_k,
"topP": self.top_p,
"repeatPenalty": self.repeat_penalty,
"maxTokens": self.max_tokens,
"stopBefore": self.stop_before,
"includeAiFilters": self.include_ai_filters,
"seed": self.seed,
}
filtered = {k: v for k, v in defaults.items() if v is not None}
return filtered
@property
def _identifying_params(self) -> Dict[str, Any]:
"""Get the identifying parameters."""
self._default_params["model_name"] = self.model_name
return self._default_params
@property
def lc_secrets(self) -> Dict[str, str]:
return {
"ncp_clovastudio_api_key": "NCP_CLOVASTUDIO_API_KEY",
"ncp_apigw_api_key": "NCP_APIGW_API_KEY",
}
@property
def _llm_type(self) -> str:
"""Return type of chat model."""
return "chat-naver"
def _get_ls_params(
self, stop: Optional[List[str]] = None, **kwargs: Any
) -> LangSmithParams:
"""Get the parameters used to invoke the model."""
params = super()._get_ls_params(stop=stop, **kwargs)
params["ls_provider"] = "naver"
return params
@property
def _client_params(self) -> Dict[str, Any]:
"""Get the parameters used for the client."""
return self._default_params
@property
def _api_url(self) -> str:
"""GET chat completion api url"""
app_type = "serviceapp" if self.service_app else "testapp"
if self.task_id:
return (
f"{self.base_url}/{app_type}/v1/tasks/{self.task_id}/chat-completions"
)
else:
return f"{self.base_url}/{app_type}/v1/chat-completions/{self.model_name}"
@model_validator(mode="after")
def validate_model_after(self) -> Self:
if not (self.model_name or self.task_id):
raise ValueError("either model_name or task_id must be assigned a value.")
if not self.ncp_clovastudio_api_key:
self.ncp_clovastudio_api_key = convert_to_secret_str(
get_from_env("ncp_clovastudio_api_key", "NCP_CLOVASTUDIO_API_KEY")
)
if not self.ncp_apigw_api_key:
self.ncp_apigw_api_key = convert_to_secret_str(
get_from_env("ncp_apigw_api_key", "NCP_APIGW_API_KEY")
)
if not self.base_url:
self.base_url = get_from_env(
"base_url", "NCP_CLOVASTUDIO_API_BASE_URL", _DEFAULT_BASE_URL
)
if not self.client:
self.client = httpx.Client(
base_url=self.base_url,
headers=self.default_headers(),
timeout=self.timeout,
)
if not self.async_client:
self.async_client = httpx.AsyncClient(
base_url=self.base_url,
headers=self.default_headers(),
timeout=self.timeout,
)
return self
def default_headers(self) -> Dict[str, Any]:
clovastudio_api_key = (
self.ncp_clovastudio_api_key.get_secret_value()
if self.ncp_clovastudio_api_key
else None
)
apigw_api_key = (
self.ncp_apigw_api_key.get_secret_value()
if self.ncp_apigw_api_key
else None
)
return {
"Content-Type": "application/json",
"Accept": "application/json",
"X-NCP-CLOVASTUDIO-API-KEY": clovastudio_api_key,
"X-NCP-APIGW-API-KEY": apigw_api_key,
}
def _create_message_dicts(
self, messages: List[BaseMessage], stop: Optional[List[str]]
) -> Tuple[List[Dict], Dict[str, Any]]:
params = self._client_params
if stop is not None and "stopBefore" in params:
params["stopBefore"] = stop
message_dicts = [_convert_message_to_naver_chat_message(m) for m in messages]
return message_dicts, params
def _completion_with_retry(self, **kwargs: Any) -> Any:
from httpx_sse import (
ServerSentEvent,
SSEError,
connect_sse,
)
if "stream" not in kwargs:
kwargs["stream"] = False
stream = kwargs["stream"]
if stream:
def iter_sse() -> Iterator[ServerSentEvent]:
with connect_sse(
self.client, "POST", self._api_url, json=kwargs
) as event_source:
_raise_on_error(event_source.response)
for sse in event_source.iter_sse():
event_data = sse.json()
if (
sse.event == "signal"
and event_data.get("data", {}) == "[DONE]"
):
return
if sse.event == "result":
return
if sse.event == "error":
raise SSEError(message=sse.data)
yield sse
return iter_sse()
else:
response = self.client.post(url=self._api_url, json=kwargs)
_raise_on_error(response)
return response.json()
async def _acompletion_with_retry(
self,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Any:
from httpx_sse import aconnect_sse
"""Use tenacity to retry the async completion call."""
retry_decorator = _create_retry_decorator(self, run_manager=run_manager)
@retry_decorator
async def _completion_with_retry(**kwargs: Any) -> Any:
if "stream" not in kwargs:
kwargs["stream"] = False
stream = kwargs["stream"]
if stream:
event_source = aconnect_sse(
self.async_client, "POST", self._api_url, json=kwargs
)
return _aiter_sse(event_source)
else:
response = await self.async_client.post(url=self._api_url, json=kwargs)
await _araise_on_error(response)
return response.json()
return await _completion_with_retry(**kwargs)
def _create_chat_result(self, response: Dict) -> ChatResult:
generations = []
result = response.get("result", {})
msg = result.get("message", {})
message = _convert_naver_chat_message_to_message(msg)
if isinstance(message, AIMessage):
message.usage_metadata = {
"input_tokens": result.get("inputLength"),
"output_tokens": result.get("outputLength"),
"total_tokens": result.get("inputLength") + result.get("outputLength"),
}
gen = ChatGeneration(
message=message,
)
generations.append(gen)
llm_output = {
"stop_reason": result.get("stopReason"),
"input_length": result.get("inputLength"),
"output_length": result.get("outputLength"),
"seed": result.get("seed"),
"ai_filter": result.get("aiFilter"),
}
return ChatResult(generations=generations, llm_output=llm_output)
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs}
response = self._completion_with_retry(messages=message_dicts, **params)
return self._create_chat_result(response)
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs, "stream": True}
default_chunk_class: Type[BaseMessageChunk] = AIMessageChunk
for sse in self._completion_with_retry(
messages=message_dicts, run_manager=run_manager, **params
):
new_chunk = _convert_chunk_to_message_chunk(sse, default_chunk_class)
default_chunk_class = new_chunk.__class__
gen_chunk = ChatGenerationChunk(message=new_chunk)
if run_manager:
run_manager.on_llm_new_token(
token=cast(str, new_chunk.content), chunk=gen_chunk
)
yield gen_chunk
async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs}
response = await self._acompletion_with_retry(
messages=message_dicts, run_manager=run_manager, **params
)
return self._create_chat_result(response)
async def _astream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs, "stream": True}
default_chunk_class: Type[BaseMessageChunk] = AIMessageChunk
async for chunk in await self._acompletion_with_retry(
messages=message_dicts, run_manager=run_manager, **params
):
new_chunk = _convert_chunk_to_message_chunk(chunk, default_chunk_class)
default_chunk_class = new_chunk.__class__
gen_chunk = ChatGenerationChunk(message=new_chunk)
if run_manager:
await run_manager.on_llm_new_token(
token=cast(str, new_chunk.content), chunk=gen_chunk
)
yield gen_chunk
def _create_retry_decorator(
llm: ChatClovaX,
run_manager: Optional[
Union[AsyncCallbackManagerForLLMRun, CallbackManagerForLLMRun]
] = None,
) -> Callable[[Any], Any]:
"""Returns a tenacity retry decorator, preconfigured to handle exceptions"""
errors = [httpx.RequestError, httpx.StreamError]
return create_base_retry_decorator(
error_types=errors, max_retries=llm.max_retries, run_manager=run_manager
)