mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-28 15:00:23 +00:00
```python """python scripts/update_mypy_ruff.py""" import glob import tomllib from pathlib import Path import toml import subprocess import re ROOT_DIR = Path(__file__).parents[1] def main(): for path in glob.glob(str(ROOT_DIR / "libs/**/pyproject.toml"), recursive=True): print(path) with open(path, "rb") as f: pyproject = tomllib.load(f) try: pyproject["tool"]["poetry"]["group"]["typing"]["dependencies"]["mypy"] = ( "^1.10" ) pyproject["tool"]["poetry"]["group"]["lint"]["dependencies"]["ruff"] = ( "^0.5" ) except KeyError: continue with open(path, "w") as f: toml.dump(pyproject, f) cwd = "/".join(path.split("/")[:-1]) completed = subprocess.run( "poetry lock --no-update; poetry install --with typing; poetry run mypy . --no-color", cwd=cwd, shell=True, capture_output=True, text=True, ) logs = completed.stdout.split("\n") to_ignore = {} for l in logs: if re.match("^(.*)\:(\d+)\: error:.*\[(.*)\]", l): path, line_no, error_type = re.match( "^(.*)\:(\d+)\: error:.*\[(.*)\]", l ).groups() if (path, line_no) in to_ignore: to_ignore[(path, line_no)].append(error_type) else: to_ignore[(path, line_no)] = [error_type] print(len(to_ignore)) for (error_path, line_no), error_types in to_ignore.items(): all_errors = ", ".join(error_types) full_path = f"{cwd}/{error_path}" try: with open(full_path, "r") as f: file_lines = f.readlines() except FileNotFoundError: continue file_lines[int(line_no) - 1] = ( file_lines[int(line_no) - 1][:-1] + f" # type: ignore[{all_errors}]\n" ) with open(full_path, "w") as f: f.write("".join(file_lines)) subprocess.run( "poetry run ruff format .; poetry run ruff --select I --fix .", cwd=cwd, shell=True, capture_output=True, text=True, ) if __name__ == "__main__": main() ```
100 lines
3.3 KiB
Python
100 lines
3.3 KiB
Python
"""Util that calls Google Finance Search."""
|
|
|
|
from typing import Any, Dict, Optional, cast
|
|
|
|
from langchain_core.pydantic_v1 import BaseModel, Extra, SecretStr, root_validator
|
|
from langchain_core.utils import convert_to_secret_str, get_from_dict_or_env
|
|
|
|
|
|
class GoogleFinanceAPIWrapper(BaseModel):
|
|
"""Wrapper for SerpApi's Google Finance API
|
|
|
|
You can create SerpApi.com key by signing up at: https://serpapi.com/users/sign_up.
|
|
The wrapper uses the SerpApi.com python package:
|
|
https://serpapi.com/integrations/python
|
|
To use, you should have the environment variable ``SERPAPI_API_KEY``
|
|
set with your API key, or pass `serp_api_key` as a named parameter
|
|
to the constructor.
|
|
Example:
|
|
.. code-block:: python
|
|
from langchain_community.utilities import GoogleFinanceAPIWrapper
|
|
google_Finance = GoogleFinanceAPIWrapper()
|
|
google_Finance.run('langchain')
|
|
"""
|
|
|
|
serp_search_engine: Any
|
|
serp_api_key: Optional[SecretStr] = None
|
|
|
|
class Config:
|
|
"""Configuration for this pydantic object."""
|
|
|
|
extra = Extra.forbid
|
|
|
|
@root_validator(pre=True)
|
|
def validate_environment(cls, values: Dict) -> Dict:
|
|
"""Validate that api key and python package exists in environment."""
|
|
values["serp_api_key"] = convert_to_secret_str(
|
|
get_from_dict_or_env(values, "serp_api_key", "SERPAPI_API_KEY")
|
|
)
|
|
|
|
try:
|
|
from serpapi import SerpApiClient
|
|
|
|
except ImportError:
|
|
raise ImportError(
|
|
"google-search-results is not installed. "
|
|
"Please install it with `pip install google-search-results"
|
|
">=2.4.2`"
|
|
)
|
|
serp_search_engine = SerpApiClient
|
|
values["serp_search_engine"] = serp_search_engine
|
|
|
|
return values
|
|
|
|
def run(self, query: str) -> str:
|
|
"""Run query through Google Finance with Serpapi"""
|
|
serpapi_api_key = cast(SecretStr, self.serp_api_key)
|
|
params = {
|
|
"engine": "google_finance",
|
|
"api_key": serpapi_api_key.get_secret_value(),
|
|
"q": query,
|
|
}
|
|
|
|
total_results = {}
|
|
client = self.serp_search_engine(params)
|
|
total_results = client.get_dict()
|
|
|
|
if not total_results:
|
|
return "Nothing was found from the query: " + query
|
|
|
|
markets = total_results.get("markets", {})
|
|
res = "\nQuery: " + query + "\n"
|
|
|
|
if "futures_chain" in total_results:
|
|
futures_chain = total_results.get("futures_chain", [])[0]
|
|
stock = futures_chain["stock"]
|
|
price = futures_chain["price"]
|
|
temp = futures_chain["price_movement"]
|
|
percentage = temp["percentage"]
|
|
movement = temp["movement"]
|
|
res += (
|
|
f"stock: {stock}\n"
|
|
+ f"price: {price}\n"
|
|
+ f"percentage: {percentage}\n"
|
|
+ f"movement: {movement}\n"
|
|
)
|
|
|
|
else:
|
|
res += "No summary information\n"
|
|
|
|
for key in markets:
|
|
if (key == "us") or (key == "asia") or (key == "europe"):
|
|
res += key
|
|
res += ": price = "
|
|
res += str(markets[key][0]["price"])
|
|
res += ", movement = "
|
|
res += markets[key][0]["price_movement"]["movement"]
|
|
res += "\n"
|
|
|
|
return res
|