mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-19 00:58:32 +00:00
community: Add PolygonAggregates tool (#18882)
**Description:** In this PR, I am adding a `PolygonAggregates` tool, which can be used to get historical stock price data (called aggregates by Polygon) for a given ticker. Polygon [docs](https://polygon.io/docs/stocks/get_v2_aggs_ticker__stocksticker__range__multiplier___timespan___from___to) for this endpoint. **Twitter**: [@virattt](https://twitter.com/virattt)
This commit is contained in:
@@ -3,6 +3,7 @@ from typing import List
|
||||
from langchain_community.agent_toolkits.base import BaseToolkit
|
||||
from langchain_community.tools import BaseTool
|
||||
from langchain_community.tools.polygon import (
|
||||
PolygonAggregates,
|
||||
PolygonFinancials,
|
||||
PolygonLastQuote,
|
||||
PolygonTickerNews,
|
||||
@@ -20,6 +21,9 @@ class PolygonToolkit(BaseToolkit):
|
||||
cls, polygon_api_wrapper: PolygonAPIWrapper
|
||||
) -> "PolygonToolkit":
|
||||
tools = [
|
||||
PolygonAggregates(
|
||||
api_wrapper=polygon_api_wrapper,
|
||||
),
|
||||
PolygonLastQuote(
|
||||
api_wrapper=polygon_api_wrapper,
|
||||
),
|
||||
|
@@ -504,6 +504,12 @@ def _import_plugin() -> Any:
|
||||
return AIPluginTool
|
||||
|
||||
|
||||
def _import_polygon_tool_PolygonAggregates() -> Any:
|
||||
from langchain_community.tools.polygon.aggregates import PolygonAggregates
|
||||
|
||||
return PolygonAggregates
|
||||
|
||||
|
||||
def _import_polygon_tool_PolygonFinancials() -> Any:
|
||||
from langchain_community.tools.polygon.financials import PolygonFinancials
|
||||
|
||||
@@ -973,6 +979,8 @@ def __getattr__(name: str) -> Any:
|
||||
return _import_playwright_NavigateTool()
|
||||
elif name == "AIPluginTool":
|
||||
return _import_plugin()
|
||||
elif name == "PolygonAggregates":
|
||||
return _import_polygon_tool_PolygonAggregates()
|
||||
elif name == "PolygonFinancials":
|
||||
return _import_polygon_tool_PolygonFinancials()
|
||||
elif name == "PolygonLastQuote":
|
||||
@@ -1164,6 +1172,7 @@ __all__ = [
|
||||
"OpenAPISpec",
|
||||
"OpenWeatherMapQueryRun",
|
||||
"PubmedQueryRun",
|
||||
"PolygonAggregates",
|
||||
"PolygonFinancials",
|
||||
"PolygonLastQuote",
|
||||
"PolygonTickerNews",
|
||||
|
@@ -1,10 +1,12 @@
|
||||
"""Polygon IO tools."""
|
||||
|
||||
from langchain_community.tools.polygon.aggregates import PolygonAggregates
|
||||
from langchain_community.tools.polygon.financials import PolygonFinancials
|
||||
from langchain_community.tools.polygon.last_quote import PolygonLastQuote
|
||||
from langchain_community.tools.polygon.ticker_news import PolygonTickerNews
|
||||
|
||||
__all__ = [
|
||||
"PolygonAggregates",
|
||||
"PolygonFinancials",
|
||||
"PolygonLastQuote",
|
||||
"PolygonTickerNews",
|
||||
|
@@ -0,0 +1,77 @@
|
||||
from typing import Optional, Type
|
||||
|
||||
from langchain_core.callbacks import CallbackManagerForToolRun
|
||||
from langchain_core.pydantic_v1 import BaseModel, Field
|
||||
from langchain_core.tools import BaseTool
|
||||
|
||||
from langchain_community.utilities.polygon import PolygonAPIWrapper
|
||||
|
||||
|
||||
class PolygonAggregatesSchema(BaseModel):
|
||||
"""Input for PolygonAggregates."""
|
||||
|
||||
ticker: str = Field(
|
||||
description="The ticker symbol to fetch aggregates for.",
|
||||
)
|
||||
timespan: str = Field(
|
||||
description="The size of the time window. "
|
||||
"Possible values are: "
|
||||
"second, minute, hour, day, week, month, quarter, year. "
|
||||
"Default is 'day'",
|
||||
)
|
||||
timespan_multiplier: int = Field(
|
||||
description="The number of timespans to aggregate. "
|
||||
"For example, if timespan is 'day' and "
|
||||
"timespan_multiplier is 1, the result will be daily bars. "
|
||||
"If timespan is 'day' and timespan_multiplier is 5, "
|
||||
"the result will be weekly bars. "
|
||||
"Default is 1.",
|
||||
)
|
||||
from_date: str = Field(
|
||||
description="The start of the aggregate time window. "
|
||||
"Either a date with the format YYYY-MM-DD or "
|
||||
"a millisecond timestamp.",
|
||||
)
|
||||
to_date: str = Field(
|
||||
description="The end of the aggregate time window. "
|
||||
"Either a date with the format YYYY-MM-DD or "
|
||||
"a millisecond timestamp.",
|
||||
)
|
||||
|
||||
|
||||
class PolygonAggregates(BaseTool):
|
||||
"""
|
||||
Tool that gets aggregate bars (stock prices) over a
|
||||
given date range for a given ticker from Polygon.
|
||||
"""
|
||||
|
||||
mode: str = "get_aggregates"
|
||||
name: str = "polygon_aggregates"
|
||||
description: str = (
|
||||
"A wrapper around Polygon's Aggregates API. "
|
||||
"This tool is useful for fetching aggregate bars (stock prices) for a ticker. "
|
||||
"Input should be the ticker, date range, timespan, and timespan multiplier"
|
||||
" that you want to get the aggregate bars for."
|
||||
)
|
||||
args_schema: Type[PolygonAggregatesSchema] = PolygonAggregatesSchema
|
||||
|
||||
api_wrapper: PolygonAPIWrapper
|
||||
|
||||
def _run(
|
||||
self,
|
||||
ticker: str,
|
||||
timespan: str,
|
||||
timespan_multiplier: int,
|
||||
from_date: str,
|
||||
to_date: str,
|
||||
run_manager: Optional[CallbackManagerForToolRun] = None,
|
||||
) -> str:
|
||||
"""Use the Polygon API tool."""
|
||||
return self.api_wrapper.run(
|
||||
mode=self.mode,
|
||||
ticker=ticker,
|
||||
timespan=timespan,
|
||||
timespan_multiplier=timespan_multiplier,
|
||||
from_date=from_date,
|
||||
to_date=to_date,
|
||||
)
|
@@ -3,7 +3,7 @@ Util that calls several of Polygon's stock market REST APIs.
|
||||
Docs: https://polygon.io/docs/stocks/getting-started
|
||||
"""
|
||||
import json
|
||||
from typing import Dict, Optional
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import requests
|
||||
from langchain_core.pydantic_v1 import BaseModel, root_validator
|
||||
@@ -31,6 +31,8 @@ class PolygonAPIWrapper(BaseModel):
|
||||
"""
|
||||
Get fundamental financial data, which is found in balance sheets,
|
||||
income statements, and cash flow statements for a given ticker.
|
||||
|
||||
/vX/reference/financials
|
||||
"""
|
||||
url = (
|
||||
f"{POLYGON_BASE_URL}vX/reference/financials?"
|
||||
@@ -47,7 +49,11 @@ class PolygonAPIWrapper(BaseModel):
|
||||
return data.get("results", None)
|
||||
|
||||
def get_last_quote(self, ticker: str) -> Optional[dict]:
|
||||
"""Get the most recent National Best Bid and Offer (Quote) for a ticker."""
|
||||
"""
|
||||
Get the most recent National Best Bid and Offer (Quote) for a ticker.
|
||||
|
||||
/v2/last/nbbo/{ticker}
|
||||
"""
|
||||
url = f"{POLYGON_BASE_URL}v2/last/nbbo/{ticker}?apiKey={self.polygon_api_key}"
|
||||
response = requests.get(url)
|
||||
data = response.json()
|
||||
@@ -62,6 +68,8 @@ class PolygonAPIWrapper(BaseModel):
|
||||
"""
|
||||
Get the most recent news articles relating to a stock ticker symbol,
|
||||
including a summary of the article and a link to the original source.
|
||||
|
||||
/v2/reference/news
|
||||
"""
|
||||
url = (
|
||||
f"{POLYGON_BASE_URL}v2/reference/news?"
|
||||
@@ -77,12 +85,48 @@ class PolygonAPIWrapper(BaseModel):
|
||||
|
||||
return data.get("results", None)
|
||||
|
||||
def run(self, mode: str, ticker: str) -> str:
|
||||
def get_aggregates(self, ticker: str, **kwargs: Any) -> Optional[dict]:
|
||||
"""
|
||||
Get aggregate bars for a stock over a given date range
|
||||
in custom time window sizes.
|
||||
|
||||
/v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{from_date}/{to_date}
|
||||
"""
|
||||
timespan = kwargs.get("timespan", "day")
|
||||
multiplier = kwargs.get("timespan_multiplier", 1)
|
||||
from_date = kwargs.get("from_date", None)
|
||||
to_date = kwargs.get("to_date", None)
|
||||
adjusted = kwargs.get("adjusted", True)
|
||||
sort = kwargs.get("sort", "asc")
|
||||
|
||||
url = (
|
||||
f"{POLYGON_BASE_URL}v2/aggs"
|
||||
f"/ticker/{ticker}"
|
||||
f"/range/{multiplier}"
|
||||
f"/{timespan}"
|
||||
f"/{from_date}"
|
||||
f"/{to_date}"
|
||||
f"?apiKey={self.polygon_api_key}"
|
||||
f"&adjusted={adjusted}"
|
||||
f"&sort={sort}"
|
||||
)
|
||||
response = requests.get(url)
|
||||
data = response.json()
|
||||
|
||||
status = data.get("status", None)
|
||||
if status != "OK":
|
||||
raise ValueError(f"API Error: {data}")
|
||||
|
||||
return data.get("results", None)
|
||||
|
||||
def run(self, mode: str, ticker: str, **kwargs: Any) -> str:
|
||||
if mode == "get_financials":
|
||||
return json.dumps(self.get_financials(ticker))
|
||||
elif mode == "get_last_quote":
|
||||
return json.dumps(self.get_last_quote(ticker))
|
||||
elif mode == "get_ticker_news":
|
||||
return json.dumps(self.get_ticker_news(ticker))
|
||||
elif mode == "get_aggregates":
|
||||
return json.dumps(self.get_aggregates(ticker, **kwargs))
|
||||
else:
|
||||
raise ValueError(f"Invalid mode {mode} for Polygon API.")
|
||||
|
Reference in New Issue
Block a user