"""Wrapper around Perplexity APIs.""" from __future__ import annotations import logging from operator import itemgetter from typing import ( Any, Dict, Iterator, List, Literal, Mapping, Optional, Tuple, Type, TypeVar, Union, ) from langchain_core.callbacks import CallbackManagerForLLMRun from langchain_core.language_models import LanguageModelInput from langchain_core.language_models.chat_models import ( BaseChatModel, generate_from_stream, ) from langchain_core.messages import ( AIMessage, AIMessageChunk, BaseMessage, BaseMessageChunk, ChatMessage, ChatMessageChunk, FunctionMessageChunk, HumanMessage, HumanMessageChunk, SystemMessage, SystemMessageChunk, ToolMessageChunk, ) from langchain_core.output_parsers import JsonOutputParser, PydanticOutputParser from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult from langchain_core.runnables import Runnable, RunnableMap, RunnablePassthrough from langchain_core.utils import from_env, get_pydantic_field_names from langchain_core.utils.pydantic import ( is_basemodel_subclass, ) from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, model_validator from typing_extensions import Self _BM = TypeVar("_BM", bound=BaseModel) _DictOrPydanticClass = Union[Dict[str, Any], Type[_BM], Type] _DictOrPydantic = Union[Dict, _BM] logger = logging.getLogger(__name__) def _is_pydantic_class(obj: Any) -> bool: return isinstance(obj, type) and is_basemodel_subclass(obj) class ChatPerplexity(BaseChatModel): """`Perplexity AI` Chat models API. To use, you should have the ``openai`` python package installed, and the environment variable ``PPLX_API_KEY`` set to your API key. Any parameters that are valid to be passed to the openai.create call can be passed in, even if not explicitly saved on this class. Example: .. code-block:: python from langchain_community.chat_models import ChatPerplexity chat = ChatPerplexity( model="llama-3.1-sonar-small-128k-online", temperature=0.7, ) """ client: Any = None #: :meta private: model: str = "llama-3.1-sonar-small-128k-online" """Model name.""" temperature: float = 0.7 """What sampling temperature to use.""" model_kwargs: Dict[str, Any] = Field(default_factory=dict) """Holds any model parameters valid for `create` call not explicitly specified.""" pplx_api_key: Optional[str] = Field( default_factory=from_env("PPLX_API_KEY", default=None), alias="api_key" ) """Base URL path for API requests, leave blank if not using a proxy or service emulator.""" request_timeout: Optional[Union[float, Tuple[float, float]]] = Field( None, alias="timeout" ) """Timeout for requests to PerplexityChat completion API. Default is None.""" max_retries: int = 6 """Maximum number of retries to make when generating.""" streaming: bool = False """Whether to stream the results or not.""" max_tokens: Optional[int] = None """Maximum number of tokens to generate.""" model_config = ConfigDict( populate_by_name=True, ) @property def lc_secrets(self) -> Dict[str, str]: return {"pplx_api_key": "PPLX_API_KEY"} @model_validator(mode="before") @classmethod def build_extra(cls, values: Dict[str, Any]) -> Any: """Build extra kwargs from additional params that were passed in.""" all_required_field_names = get_pydantic_field_names(cls) extra = values.get("model_kwargs", {}) for field_name in list(values): if field_name in extra: raise ValueError(f"Found {field_name} supplied twice.") if field_name not in all_required_field_names: logger.warning( f"""WARNING! {field_name} is not a default parameter. {field_name} was transferred to model_kwargs. Please confirm that {field_name} is what you intended.""" ) extra[field_name] = values.pop(field_name) invalid_model_kwargs = all_required_field_names.intersection(extra.keys()) if invalid_model_kwargs: raise ValueError( f"Parameters {invalid_model_kwargs} should be specified explicitly. " f"Instead they were passed in as part of `model_kwargs` parameter." ) values["model_kwargs"] = extra return values @model_validator(mode="after") def validate_environment(self) -> Self: """Validate that api key and python package exists in environment.""" try: import openai except ImportError: raise ImportError( "Could not import openai python package. " "Please install it with `pip install openai`." ) try: self.client = openai.OpenAI( api_key=self.pplx_api_key, base_url="https://api.perplexity.ai" ) except AttributeError: raise ValueError( "`openai` has no `ChatCompletion` attribute, this is likely " "due to an old version of the openai package. Try upgrading it " "with `pip install --upgrade openai`." ) return self @property def _default_params(self) -> Dict[str, Any]: """Get the default parameters for calling PerplexityChat API.""" return { "max_tokens": self.max_tokens, "stream": self.streaming, "temperature": self.temperature, **self.model_kwargs, } def _convert_message_to_dict(self, message: BaseMessage) -> Dict[str, Any]: if isinstance(message, ChatMessage): message_dict = {"role": message.role, "content": message.content} elif isinstance(message, SystemMessage): message_dict = {"role": "system", "content": message.content} elif isinstance(message, HumanMessage): message_dict = {"role": "user", "content": message.content} elif isinstance(message, AIMessage): message_dict = {"role": "assistant", "content": message.content} else: raise TypeError(f"Got unknown type {message}") return message_dict def _create_message_dicts( self, messages: List[BaseMessage], stop: Optional[List[str]] ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: params = dict(self._invocation_params) if stop is not None: if "stop" in params: raise ValueError("`stop` found in both the input and default params.") params["stop"] = stop message_dicts = [self._convert_message_to_dict(m) for m in messages] return message_dicts, params def _convert_delta_to_message_chunk( self, _dict: Mapping[str, Any], default_class: Type[BaseMessageChunk] ) -> BaseMessageChunk: role = _dict.get("role") content = _dict.get("content") or "" additional_kwargs: Dict = {} if _dict.get("function_call"): function_call = dict(_dict["function_call"]) if "name" in function_call and function_call["name"] is None: function_call["name"] = "" additional_kwargs["function_call"] = function_call if _dict.get("tool_calls"): additional_kwargs["tool_calls"] = _dict["tool_calls"] if role == "user" or default_class == HumanMessageChunk: return HumanMessageChunk(content=content) elif role == "assistant" or default_class == AIMessageChunk: return AIMessageChunk(content=content, additional_kwargs=additional_kwargs) elif role == "system" or default_class == SystemMessageChunk: return SystemMessageChunk(content=content) elif role == "function" or default_class == FunctionMessageChunk: return FunctionMessageChunk(content=content, name=_dict["name"]) elif role == "tool" or default_class == ToolMessageChunk: return ToolMessageChunk(content=content, tool_call_id=_dict["tool_call_id"]) elif role or default_class == ChatMessageChunk: return ChatMessageChunk(content=content, role=role) # type: ignore[arg-type] else: return default_class(content=content) # type: ignore[call-arg] def _stream( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Iterator[ChatGenerationChunk]: message_dicts, params = self._create_message_dicts(messages, stop) params = {**params, **kwargs} default_chunk_class = AIMessageChunk params.pop("stream", None) if stop: params["stop_sequences"] = stop stream_resp = self.client.chat.completions.create( messages=message_dicts, stream=True, **params ) first_chunk = True for chunk in stream_resp: if not isinstance(chunk, dict): chunk = chunk.dict() if len(chunk["choices"]) == 0: continue choice = chunk["choices"][0] citations = chunk.get("citations", []) chunk = self._convert_delta_to_message_chunk( choice["delta"], default_chunk_class ) if first_chunk: chunk.additional_kwargs |= {"citations": citations} first_chunk = False finish_reason = choice.get("finish_reason") generation_info = ( dict(finish_reason=finish_reason) if finish_reason is not None else None ) default_chunk_class = chunk.__class__ chunk = ChatGenerationChunk(message=chunk, generation_info=generation_info) if run_manager: run_manager.on_llm_new_token(chunk.text, chunk=chunk) yield chunk def _generate( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> ChatResult: if self.streaming: stream_iter = self._stream( messages, stop=stop, run_manager=run_manager, **kwargs ) if stream_iter: return generate_from_stream(stream_iter) message_dicts, params = self._create_message_dicts(messages, stop) params = {**params, **kwargs} response = self.client.chat.completions.create(messages=message_dicts, **params) message = AIMessage( content=response.choices[0].message.content, additional_kwargs={"citations": response.citations}, ) return ChatResult(generations=[ChatGeneration(message=message)]) @property def _invocation_params(self) -> Mapping[str, Any]: """Get the parameters used to invoke the model.""" pplx_creds: Dict[str, Any] = { "model": self.model, } return {**pplx_creds, **self._default_params} @property def _llm_type(self) -> str: """Return type of chat model.""" return "perplexitychat" def with_structured_output( self, schema: Optional[_DictOrPydanticClass] = None, *, method: Literal["json_schema"] = "json_schema", include_raw: bool = False, strict: Optional[bool] = None, **kwargs: Any, ) -> Runnable[LanguageModelInput, _DictOrPydantic]: """Model wrapper that returns outputs formatted to match the given schema for Preplexity. Currently, Preplexity only supports "json_schema" method for structured output as per their official documentation: https://docs.perplexity.ai/guides/structured-outputs Args: schema: The output schema. Can be passed in as: - a JSON Schema, - a TypedDict class, - or a Pydantic class method: The method for steering model generation, currently only support: - "json_schema": Use the JSON Schema to parse the model output include_raw: If False then only the parsed structured output is returned. If an error occurs during model output parsing it will be raised. If True then both the raw model response (a BaseMessage) and the parsed model response will be returned. If an error occurs during output parsing it will be caught and returned as well. The final output is always a dict with keys "raw", "parsed", and "parsing_error". kwargs: Additional keyword args aren't supported. Returns: A Runnable that takes same inputs as a :class:`langchain_core.language_models.chat.BaseChatModel`. | If ``include_raw`` is False and ``schema`` is a Pydantic class, Runnable outputs an instance of ``schema`` (i.e., a Pydantic object). Otherwise, if ``include_raw`` is False then Runnable outputs a dict. | If ``include_raw`` is True, then Runnable outputs a dict with keys: - "raw": BaseMessage - "parsed": None if there was a parsing error, otherwise the type depends on the ``schema`` as described above. - "parsing_error": Optional[BaseException] """ # noqa: E501 if method == "json_schema": if schema is None: raise ValueError( "schema must be specified when method is not 'json_schema'. " "Received None." ) is_pydantic_schema = _is_pydantic_class(schema) if is_pydantic_schema and hasattr( schema, "model_json_schema" ): # accounting for pydantic v1 and v2 response_format = schema.model_json_schema() # type: ignore[union-attr] elif is_pydantic_schema: response_format = schema.schema() # type: ignore[union-attr] elif isinstance(schema, dict): response_format = schema elif type(schema).__name__ == "_TypedDictMeta": adapter = TypeAdapter(schema) # if use passes typeddict response_format = adapter.json_schema() llm = self.bind( response_format={ "type": "json_schema", "json_schema": {"schema": response_format}, } ) output_parser = ( PydanticOutputParser(pydantic_object=schema) # type: ignore[arg-type] if is_pydantic_schema else JsonOutputParser() ) else: raise ValueError( f"Unrecognized method argument. Expected 'json_schema' Received:\ '{method}'" ) if include_raw: parser_assign = RunnablePassthrough.assign( parsed=itemgetter("raw") | output_parser, parsing_error=lambda _: None ) parser_none = RunnablePassthrough.assign(parsed=lambda _: None) parser_with_fallback = parser_assign.with_fallbacks( [parser_none], exception_key="parsing_error" ) return RunnableMap(raw=llm) | parser_with_fallback else: return llm | output_parser