mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-04 04:28:58 +00:00
community[major], core[patch], langchain[patch], experimental[patch]: Create langchain-community (#14463)
Moved the following modules to new package langchain-community in a backwards compatible fashion: ``` mv langchain/langchain/adapters community/langchain_community mv langchain/langchain/callbacks community/langchain_community/callbacks mv langchain/langchain/chat_loaders community/langchain_community mv langchain/langchain/chat_models community/langchain_community mv langchain/langchain/document_loaders community/langchain_community mv langchain/langchain/docstore community/langchain_community mv langchain/langchain/document_transformers community/langchain_community mv langchain/langchain/embeddings community/langchain_community mv langchain/langchain/graphs community/langchain_community mv langchain/langchain/llms community/langchain_community mv langchain/langchain/memory/chat_message_histories community/langchain_community mv langchain/langchain/retrievers community/langchain_community mv langchain/langchain/storage community/langchain_community mv langchain/langchain/tools community/langchain_community mv langchain/langchain/utilities community/langchain_community mv langchain/langchain/vectorstores community/langchain_community mv langchain/langchain/agents/agent_toolkits community/langchain_community mv langchain/langchain/cache.py community/langchain_community mv langchain/langchain/adapters community/langchain_community mv langchain/langchain/callbacks community/langchain_community/callbacks mv langchain/langchain/chat_loaders community/langchain_community mv langchain/langchain/chat_models community/langchain_community mv langchain/langchain/document_loaders community/langchain_community mv langchain/langchain/docstore community/langchain_community mv langchain/langchain/document_transformers community/langchain_community mv langchain/langchain/embeddings community/langchain_community mv langchain/langchain/graphs community/langchain_community mv langchain/langchain/llms community/langchain_community mv langchain/langchain/memory/chat_message_histories community/langchain_community mv langchain/langchain/retrievers community/langchain_community mv langchain/langchain/storage community/langchain_community mv langchain/langchain/tools community/langchain_community mv langchain/langchain/utilities community/langchain_community mv langchain/langchain/vectorstores community/langchain_community mv langchain/langchain/agents/agent_toolkits community/langchain_community mv langchain/langchain/cache.py community/langchain_community ``` Moved the following to core ``` mv langchain/langchain/utils/json_schema.py core/langchain_core/utils mv langchain/langchain/utils/html.py core/langchain_core/utils mv langchain/langchain/utils/strings.py core/langchain_core/utils cat langchain/langchain/utils/env.py >> core/langchain_core/utils/env.py rm langchain/langchain/utils/env.py ``` See .scripts/community_split/script_integrations.sh for all changes
This commit is contained in:
429
libs/community/langchain_community/document_loaders/youtube.py
Normal file
429
libs/community/langchain_community/document_loaders/youtube.py
Normal file
@@ -0,0 +1,429 @@
|
||||
"""Loads YouTube transcript."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Sequence, Union
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
from langchain_core.documents import Document
|
||||
from langchain_core.pydantic_v1 import root_validator
|
||||
from langchain_core.pydantic_v1.dataclasses import dataclass
|
||||
|
||||
from langchain_community.document_loaders.base import BaseLoader
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SCOPES = ["https://www.googleapis.com/auth/youtube.readonly"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GoogleApiClient:
|
||||
"""Generic Google API Client.
|
||||
|
||||
To use, you should have the ``google_auth_oauthlib,youtube_transcript_api,google``
|
||||
python package installed.
|
||||
As the google api expects credentials you need to set up a google account and
|
||||
register your Service. "https://developers.google.com/docs/api/quickstart/python"
|
||||
|
||||
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
from langchain_community.document_loaders import GoogleApiClient
|
||||
google_api_client = GoogleApiClient(
|
||||
service_account_path=Path("path_to_your_sec_file.json")
|
||||
)
|
||||
|
||||
"""
|
||||
|
||||
credentials_path: Path = Path.home() / ".credentials" / "credentials.json"
|
||||
service_account_path: Path = Path.home() / ".credentials" / "credentials.json"
|
||||
token_path: Path = Path.home() / ".credentials" / "token.json"
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.creds = self._load_credentials()
|
||||
|
||||
@root_validator
|
||||
def validate_channel_or_videoIds_is_set(
|
||||
cls, values: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
"""Validate that either folder_id or document_ids is set, but not both."""
|
||||
|
||||
if not values.get("credentials_path") and not values.get(
|
||||
"service_account_path"
|
||||
):
|
||||
raise ValueError("Must specify either channel_name or video_ids")
|
||||
return values
|
||||
|
||||
def _load_credentials(self) -> Any:
|
||||
"""Load credentials."""
|
||||
# Adapted from https://developers.google.com/drive/api/v3/quickstart/python
|
||||
try:
|
||||
from google.auth.transport.requests import Request
|
||||
from google.oauth2 import service_account
|
||||
from google.oauth2.credentials import Credentials
|
||||
from google_auth_oauthlib.flow import InstalledAppFlow
|
||||
from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"You must run"
|
||||
"`pip install --upgrade "
|
||||
"google-api-python-client google-auth-httplib2 "
|
||||
"google-auth-oauthlib "
|
||||
"youtube-transcript-api` "
|
||||
"to use the Google Drive loader"
|
||||
)
|
||||
|
||||
creds = None
|
||||
if self.service_account_path.exists():
|
||||
return service_account.Credentials.from_service_account_file(
|
||||
str(self.service_account_path)
|
||||
)
|
||||
if self.token_path.exists():
|
||||
creds = Credentials.from_authorized_user_file(str(self.token_path), SCOPES)
|
||||
|
||||
if not creds or not creds.valid:
|
||||
if creds and creds.expired and creds.refresh_token:
|
||||
creds.refresh(Request())
|
||||
else:
|
||||
flow = InstalledAppFlow.from_client_secrets_file(
|
||||
str(self.credentials_path), SCOPES
|
||||
)
|
||||
creds = flow.run_local_server(port=0)
|
||||
with open(self.token_path, "w") as token:
|
||||
token.write(creds.to_json())
|
||||
|
||||
return creds
|
||||
|
||||
|
||||
ALLOWED_SCHEMAS = {"http", "https"}
|
||||
ALLOWED_NETLOCK = {
|
||||
"youtu.be",
|
||||
"m.youtube.com",
|
||||
"youtube.com",
|
||||
"www.youtube.com",
|
||||
"www.youtube-nocookie.com",
|
||||
"vid.plus",
|
||||
}
|
||||
|
||||
|
||||
def _parse_video_id(url: str) -> Optional[str]:
|
||||
"""Parse a youtube url and return the video id if valid, otherwise None."""
|
||||
parsed_url = urlparse(url)
|
||||
|
||||
if parsed_url.scheme not in ALLOWED_SCHEMAS:
|
||||
return None
|
||||
|
||||
if parsed_url.netloc not in ALLOWED_NETLOCK:
|
||||
return None
|
||||
|
||||
path = parsed_url.path
|
||||
|
||||
if path.endswith("/watch"):
|
||||
query = parsed_url.query
|
||||
parsed_query = parse_qs(query)
|
||||
if "v" in parsed_query:
|
||||
ids = parsed_query["v"]
|
||||
video_id = ids if isinstance(ids, str) else ids[0]
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
path = parsed_url.path.lstrip("/")
|
||||
video_id = path.split("/")[-1]
|
||||
|
||||
if len(video_id) != 11: # Video IDs are 11 characters long
|
||||
return None
|
||||
|
||||
return video_id
|
||||
|
||||
|
||||
class YoutubeLoader(BaseLoader):
|
||||
"""Load `YouTube` transcripts."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
video_id: str,
|
||||
add_video_info: bool = False,
|
||||
language: Union[str, Sequence[str]] = "en",
|
||||
translation: Optional[str] = None,
|
||||
continue_on_failure: bool = False,
|
||||
):
|
||||
"""Initialize with YouTube video ID."""
|
||||
self.video_id = video_id
|
||||
self.add_video_info = add_video_info
|
||||
self.language = language
|
||||
if isinstance(language, str):
|
||||
self.language = [language]
|
||||
else:
|
||||
self.language = language
|
||||
self.translation = translation
|
||||
self.continue_on_failure = continue_on_failure
|
||||
|
||||
@staticmethod
|
||||
def extract_video_id(youtube_url: str) -> str:
|
||||
"""Extract video id from common YT urls."""
|
||||
video_id = _parse_video_id(youtube_url)
|
||||
if not video_id:
|
||||
raise ValueError(
|
||||
f"Could not determine the video ID for the URL {youtube_url}"
|
||||
)
|
||||
return video_id
|
||||
|
||||
@classmethod
|
||||
def from_youtube_url(cls, youtube_url: str, **kwargs: Any) -> YoutubeLoader:
|
||||
"""Given youtube URL, load video."""
|
||||
video_id = cls.extract_video_id(youtube_url)
|
||||
return cls(video_id, **kwargs)
|
||||
|
||||
def load(self) -> List[Document]:
|
||||
"""Load documents."""
|
||||
try:
|
||||
from youtube_transcript_api import (
|
||||
NoTranscriptFound,
|
||||
TranscriptsDisabled,
|
||||
YouTubeTranscriptApi,
|
||||
)
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"Could not import youtube_transcript_api python package. "
|
||||
"Please install it with `pip install youtube-transcript-api`."
|
||||
)
|
||||
|
||||
metadata = {"source": self.video_id}
|
||||
|
||||
if self.add_video_info:
|
||||
# Get more video meta info
|
||||
# Such as title, description, thumbnail url, publish_date
|
||||
video_info = self._get_video_info()
|
||||
metadata.update(video_info)
|
||||
|
||||
try:
|
||||
transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id)
|
||||
except TranscriptsDisabled:
|
||||
return []
|
||||
|
||||
try:
|
||||
transcript = transcript_list.find_transcript(self.language)
|
||||
except NoTranscriptFound:
|
||||
transcript = transcript_list.find_transcript(["en"])
|
||||
|
||||
if self.translation is not None:
|
||||
transcript = transcript.translate(self.translation)
|
||||
|
||||
transcript_pieces = transcript.fetch()
|
||||
|
||||
transcript = " ".join([t["text"].strip(" ") for t in transcript_pieces])
|
||||
|
||||
return [Document(page_content=transcript, metadata=metadata)]
|
||||
|
||||
def _get_video_info(self) -> dict:
|
||||
"""Get important video information.
|
||||
|
||||
Components are:
|
||||
- title
|
||||
- description
|
||||
- thumbnail url,
|
||||
- publish_date
|
||||
- channel_author
|
||||
- and more.
|
||||
"""
|
||||
try:
|
||||
from pytube import YouTube
|
||||
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"Could not import pytube python package. "
|
||||
"Please install it with `pip install pytube`."
|
||||
)
|
||||
yt = YouTube(f"https://www.youtube.com/watch?v={self.video_id}")
|
||||
video_info = {
|
||||
"title": yt.title or "Unknown",
|
||||
"description": yt.description or "Unknown",
|
||||
"view_count": yt.views or 0,
|
||||
"thumbnail_url": yt.thumbnail_url or "Unknown",
|
||||
"publish_date": yt.publish_date.strftime("%Y-%m-%d %H:%M:%S")
|
||||
if yt.publish_date
|
||||
else "Unknown",
|
||||
"length": yt.length or 0,
|
||||
"author": yt.author or "Unknown",
|
||||
}
|
||||
return video_info
|
||||
|
||||
|
||||
@dataclass
|
||||
class GoogleApiYoutubeLoader(BaseLoader):
|
||||
"""Load all Videos from a `YouTube` Channel.
|
||||
|
||||
To use, you should have the ``googleapiclient,youtube_transcript_api``
|
||||
python package installed.
|
||||
As the service needs a google_api_client, you first have to initialize
|
||||
the GoogleApiClient.
|
||||
|
||||
Additionally you have to either provide a channel name or a list of videoids
|
||||
"https://developers.google.com/docs/api/quickstart/python"
|
||||
|
||||
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
from langchain_community.document_loaders import GoogleApiClient
|
||||
from langchain_community.document_loaders import GoogleApiYoutubeLoader
|
||||
google_api_client = GoogleApiClient(
|
||||
service_account_path=Path("path_to_your_sec_file.json")
|
||||
)
|
||||
loader = GoogleApiYoutubeLoader(
|
||||
google_api_client=google_api_client,
|
||||
channel_name = "CodeAesthetic"
|
||||
)
|
||||
load.load()
|
||||
|
||||
"""
|
||||
|
||||
google_api_client: GoogleApiClient
|
||||
channel_name: Optional[str] = None
|
||||
video_ids: Optional[List[str]] = None
|
||||
add_video_info: bool = True
|
||||
captions_language: str = "en"
|
||||
continue_on_failure: bool = False
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.youtube_client = self._build_youtube_client(self.google_api_client.creds)
|
||||
|
||||
def _build_youtube_client(self, creds: Any) -> Any:
|
||||
try:
|
||||
from googleapiclient.discovery import build
|
||||
from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"You must run"
|
||||
"`pip install --upgrade "
|
||||
"google-api-python-client google-auth-httplib2 "
|
||||
"google-auth-oauthlib "
|
||||
"youtube-transcript-api` "
|
||||
"to use the Google Drive loader"
|
||||
)
|
||||
|
||||
return build("youtube", "v3", credentials=creds)
|
||||
|
||||
@root_validator
|
||||
def validate_channel_or_videoIds_is_set(
|
||||
cls, values: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
"""Validate that either folder_id or document_ids is set, but not both."""
|
||||
if not values.get("channel_name") and not values.get("video_ids"):
|
||||
raise ValueError("Must specify either channel_name or video_ids")
|
||||
return values
|
||||
|
||||
def _get_transcripe_for_video_id(self, video_id: str) -> str:
|
||||
from youtube_transcript_api import NoTranscriptFound, YouTubeTranscriptApi
|
||||
|
||||
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
|
||||
try:
|
||||
transcript = transcript_list.find_transcript([self.captions_language])
|
||||
except NoTranscriptFound:
|
||||
for available_transcript in transcript_list:
|
||||
transcript = available_transcript.translate(self.captions_language)
|
||||
continue
|
||||
|
||||
transcript_pieces = transcript.fetch()
|
||||
return " ".join([t["text"].strip(" ") for t in transcript_pieces])
|
||||
|
||||
def _get_document_for_video_id(self, video_id: str, **kwargs: Any) -> Document:
|
||||
captions = self._get_transcripe_for_video_id(video_id)
|
||||
video_response = (
|
||||
self.youtube_client.videos()
|
||||
.list(
|
||||
part="id,snippet",
|
||||
id=video_id,
|
||||
)
|
||||
.execute()
|
||||
)
|
||||
return Document(
|
||||
page_content=captions,
|
||||
metadata=video_response.get("items")[0],
|
||||
)
|
||||
|
||||
def _get_channel_id(self, channel_name: str) -> str:
|
||||
request = self.youtube_client.search().list(
|
||||
part="id",
|
||||
q=channel_name,
|
||||
type="channel",
|
||||
maxResults=1, # we only need one result since channel names are unique
|
||||
)
|
||||
response = request.execute()
|
||||
channel_id = response["items"][0]["id"]["channelId"]
|
||||
return channel_id
|
||||
|
||||
def _get_document_for_channel(self, channel: str, **kwargs: Any) -> List[Document]:
|
||||
try:
|
||||
from youtube_transcript_api import (
|
||||
NoTranscriptFound,
|
||||
TranscriptsDisabled,
|
||||
)
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"You must run"
|
||||
"`pip install --upgrade "
|
||||
"youtube-transcript-api` "
|
||||
"to use the youtube loader"
|
||||
)
|
||||
|
||||
channel_id = self._get_channel_id(channel)
|
||||
request = self.youtube_client.search().list(
|
||||
part="id,snippet",
|
||||
channelId=channel_id,
|
||||
maxResults=50, # adjust this value to retrieve more or fewer videos
|
||||
)
|
||||
video_ids = []
|
||||
while request is not None:
|
||||
response = request.execute()
|
||||
|
||||
# Add each video ID to the list
|
||||
for item in response["items"]:
|
||||
if not item["id"].get("videoId"):
|
||||
continue
|
||||
meta_data = {"videoId": item["id"]["videoId"]}
|
||||
if self.add_video_info:
|
||||
item["snippet"].pop("thumbnails")
|
||||
meta_data.update(item["snippet"])
|
||||
try:
|
||||
page_content = self._get_transcripe_for_video_id(
|
||||
item["id"]["videoId"]
|
||||
)
|
||||
video_ids.append(
|
||||
Document(
|
||||
page_content=page_content,
|
||||
metadata=meta_data,
|
||||
)
|
||||
)
|
||||
except (TranscriptsDisabled, NoTranscriptFound) as e:
|
||||
if self.continue_on_failure:
|
||||
logger.error(
|
||||
"Error fetching transscript "
|
||||
+ f" {item['id']['videoId']}, exception: {e}"
|
||||
)
|
||||
else:
|
||||
raise e
|
||||
pass
|
||||
request = self.youtube_client.search().list_next(request, response)
|
||||
|
||||
return video_ids
|
||||
|
||||
def load(self) -> List[Document]:
|
||||
"""Load documents."""
|
||||
document_list = []
|
||||
if self.channel_name:
|
||||
document_list.extend(self._get_document_for_channel(self.channel_name))
|
||||
elif self.video_ids:
|
||||
document_list.extend(
|
||||
[
|
||||
self._get_document_for_video_id(video_id)
|
||||
for video_id in self.video_ids
|
||||
]
|
||||
)
|
||||
else:
|
||||
raise ValueError("Must specify either channel_name or video_ids")
|
||||
return document_list
|
Reference in New Issue
Block a user