Files
langchain/libs/community/langchain_community/document_loaders/youtube.py
Pasha 4e189cd89a community[patch]: youtube loader transcript format (#16625)
- **Description**: YoutubeLoader right now returns one document that
contains the entire transcript. I think it would be useful to add an
option to return multiple documents, where each document would contain
one line of transcript with the start time and duration in the metadata.
For example,
[AssemblyAIAudioTranscriptLoader](https://github.com/langchain-ai/langchain/blob/master/libs/community/langchain_community/document_loaders/assemblyai.py)
is implemented in a similar way, it allows you to choose between the
format to use for the document loader.
2024-01-26 15:26:09 -08:00

448 lines
15 KiB
Python

"""Loads YouTube transcript."""
from __future__ import annotations
import logging
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Union
from urllib.parse import parse_qs, urlparse
from langchain_core.documents import Document
from langchain_core.pydantic_v1 import root_validator
from langchain_core.pydantic_v1.dataclasses import dataclass
from langchain_community.document_loaders.base import BaseLoader
logger = logging.getLogger(__name__)
SCOPES = ["https://www.googleapis.com/auth/youtube.readonly"]
@dataclass
class GoogleApiClient:
"""Generic Google API Client.
To use, you should have the ``google_auth_oauthlib,youtube_transcript_api,google``
python package installed.
As the google api expects credentials you need to set up a google account and
register your Service. "https://developers.google.com/docs/api/quickstart/python"
Example:
.. code-block:: python
from langchain_community.document_loaders import GoogleApiClient
google_api_client = GoogleApiClient(
service_account_path=Path("path_to_your_sec_file.json")
)
"""
credentials_path: Path = Path.home() / ".credentials" / "credentials.json"
service_account_path: Path = Path.home() / ".credentials" / "credentials.json"
token_path: Path = Path.home() / ".credentials" / "token.json"
def __post_init__(self) -> None:
self.creds = self._load_credentials()
@root_validator
def validate_channel_or_videoIds_is_set(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate that either folder_id or document_ids is set, but not both."""
if not values.get("credentials_path") and not values.get(
"service_account_path"
):
raise ValueError("Must specify either channel_name or video_ids")
return values
def _load_credentials(self) -> Any:
"""Load credentials."""
# Adapted from https://developers.google.com/drive/api/v3/quickstart/python
try:
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401
except ImportError:
raise ImportError(
"You must run"
"`pip install --upgrade "
"google-api-python-client google-auth-httplib2 "
"google-auth-oauthlib "
"youtube-transcript-api` "
"to use the Google Drive loader"
)
creds = None
if self.service_account_path.exists():
return service_account.Credentials.from_service_account_file(
str(self.service_account_path)
)
if self.token_path.exists():
creds = Credentials.from_authorized_user_file(str(self.token_path), SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
str(self.credentials_path), SCOPES
)
creds = flow.run_local_server(port=0)
with open(self.token_path, "w") as token:
token.write(creds.to_json())
return creds
ALLOWED_SCHEMAS = {"http", "https"}
ALLOWED_NETLOCK = {
"youtu.be",
"m.youtube.com",
"youtube.com",
"www.youtube.com",
"www.youtube-nocookie.com",
"vid.plus",
}
def _parse_video_id(url: str) -> Optional[str]:
"""Parse a youtube url and return the video id if valid, otherwise None."""
parsed_url = urlparse(url)
if parsed_url.scheme not in ALLOWED_SCHEMAS:
return None
if parsed_url.netloc not in ALLOWED_NETLOCK:
return None
path = parsed_url.path
if path.endswith("/watch"):
query = parsed_url.query
parsed_query = parse_qs(query)
if "v" in parsed_query:
ids = parsed_query["v"]
video_id = ids if isinstance(ids, str) else ids[0]
else:
return None
else:
path = parsed_url.path.lstrip("/")
video_id = path.split("/")[-1]
if len(video_id) != 11: # Video IDs are 11 characters long
return None
return video_id
class TranscriptFormat(Enum):
TEXT = "text"
LINES = "lines"
class YoutubeLoader(BaseLoader):
"""Load `YouTube` transcripts."""
def __init__(
self,
video_id: str,
add_video_info: bool = False,
language: Union[str, Sequence[str]] = "en",
translation: Optional[str] = None,
transcript_format: TranscriptFormat = TranscriptFormat.TEXT,
continue_on_failure: bool = False,
):
"""Initialize with YouTube video ID."""
self.video_id = video_id
self.add_video_info = add_video_info
self.language = language
if isinstance(language, str):
self.language = [language]
else:
self.language = language
self.translation = translation
self.transcript_format = transcript_format
self.continue_on_failure = continue_on_failure
@staticmethod
def extract_video_id(youtube_url: str) -> str:
"""Extract video id from common YT urls."""
video_id = _parse_video_id(youtube_url)
if not video_id:
raise ValueError(
f"Could not determine the video ID for the URL {youtube_url}"
)
return video_id
@classmethod
def from_youtube_url(cls, youtube_url: str, **kwargs: Any) -> YoutubeLoader:
"""Given youtube URL, load video."""
video_id = cls.extract_video_id(youtube_url)
return cls(video_id, **kwargs)
def load(self) -> List[Document]:
"""Load documents."""
try:
from youtube_transcript_api import (
NoTranscriptFound,
TranscriptsDisabled,
YouTubeTranscriptApi,
)
except ImportError:
raise ImportError(
"Could not import youtube_transcript_api python package. "
"Please install it with `pip install youtube-transcript-api`."
)
metadata = {"source": self.video_id}
if self.add_video_info:
# Get more video meta info
# Such as title, description, thumbnail url, publish_date
video_info = self._get_video_info()
metadata.update(video_info)
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id)
except TranscriptsDisabled:
return []
try:
transcript = transcript_list.find_transcript(self.language)
except NoTranscriptFound:
transcript = transcript_list.find_transcript(["en"])
if self.translation is not None:
transcript = transcript.translate(self.translation)
transcript_pieces = transcript.fetch()
if self.transcript_format == TranscriptFormat.TEXT:
transcript = " ".join([t["text"].strip(" ") for t in transcript_pieces])
return [Document(page_content=transcript, metadata=metadata)]
elif self.transcript_format == TranscriptFormat.LINES:
return [
Document(
page_content=t["text"].strip(" "),
metadata=dict((key, t[key]) for key in t if key != "text"),
)
for t in transcript_pieces
]
else:
raise ValueError("Unknown transcript format.")
def _get_video_info(self) -> dict:
"""Get important video information.
Components are:
- title
- description
- thumbnail url,
- publish_date
- channel_author
- and more.
"""
try:
from pytube import YouTube
except ImportError:
raise ImportError(
"Could not import pytube python package. "
"Please install it with `pip install pytube`."
)
yt = YouTube(f"https://www.youtube.com/watch?v={self.video_id}")
video_info = {
"title": yt.title or "Unknown",
"description": yt.description or "Unknown",
"view_count": yt.views or 0,
"thumbnail_url": yt.thumbnail_url or "Unknown",
"publish_date": yt.publish_date.strftime("%Y-%m-%d %H:%M:%S")
if yt.publish_date
else "Unknown",
"length": yt.length or 0,
"author": yt.author or "Unknown",
}
return video_info
@dataclass
class GoogleApiYoutubeLoader(BaseLoader):
"""Load all Videos from a `YouTube` Channel.
To use, you should have the ``googleapiclient,youtube_transcript_api``
python package installed.
As the service needs a google_api_client, you first have to initialize
the GoogleApiClient.
Additionally you have to either provide a channel name or a list of videoids
"https://developers.google.com/docs/api/quickstart/python"
Example:
.. code-block:: python
from langchain_community.document_loaders import GoogleApiClient
from langchain_community.document_loaders import GoogleApiYoutubeLoader
google_api_client = GoogleApiClient(
service_account_path=Path("path_to_your_sec_file.json")
)
loader = GoogleApiYoutubeLoader(
google_api_client=google_api_client,
channel_name = "CodeAesthetic"
)
load.load()
"""
google_api_client: GoogleApiClient
channel_name: Optional[str] = None
video_ids: Optional[List[str]] = None
add_video_info: bool = True
captions_language: str = "en"
continue_on_failure: bool = False
def __post_init__(self) -> None:
self.youtube_client = self._build_youtube_client(self.google_api_client.creds)
def _build_youtube_client(self, creds: Any) -> Any:
try:
from googleapiclient.discovery import build
from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401
except ImportError:
raise ImportError(
"You must run"
"`pip install --upgrade "
"google-api-python-client google-auth-httplib2 "
"google-auth-oauthlib "
"youtube-transcript-api` "
"to use the Google Drive loader"
)
return build("youtube", "v3", credentials=creds)
@root_validator
def validate_channel_or_videoIds_is_set(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate that either folder_id or document_ids is set, but not both."""
if not values.get("channel_name") and not values.get("video_ids"):
raise ValueError("Must specify either channel_name or video_ids")
return values
def _get_transcripe_for_video_id(self, video_id: str) -> str:
from youtube_transcript_api import NoTranscriptFound, YouTubeTranscriptApi
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
try:
transcript = transcript_list.find_transcript([self.captions_language])
except NoTranscriptFound:
for available_transcript in transcript_list:
transcript = available_transcript.translate(self.captions_language)
continue
transcript_pieces = transcript.fetch()
return " ".join([t["text"].strip(" ") for t in transcript_pieces])
def _get_document_for_video_id(self, video_id: str, **kwargs: Any) -> Document:
captions = self._get_transcripe_for_video_id(video_id)
video_response = (
self.youtube_client.videos()
.list(
part="id,snippet",
id=video_id,
)
.execute()
)
return Document(
page_content=captions,
metadata=video_response.get("items")[0],
)
def _get_channel_id(self, channel_name: str) -> str:
request = self.youtube_client.search().list(
part="id",
q=channel_name,
type="channel",
maxResults=1, # we only need one result since channel names are unique
)
response = request.execute()
channel_id = response["items"][0]["id"]["channelId"]
return channel_id
def _get_document_for_channel(self, channel: str, **kwargs: Any) -> List[Document]:
try:
from youtube_transcript_api import (
NoTranscriptFound,
TranscriptsDisabled,
)
except ImportError:
raise ImportError(
"You must run"
"`pip install --upgrade "
"youtube-transcript-api` "
"to use the youtube loader"
)
channel_id = self._get_channel_id(channel)
request = self.youtube_client.search().list(
part="id,snippet",
channelId=channel_id,
maxResults=50, # adjust this value to retrieve more or fewer videos
)
video_ids = []
while request is not None:
response = request.execute()
# Add each video ID to the list
for item in response["items"]:
if not item["id"].get("videoId"):
continue
meta_data = {"videoId": item["id"]["videoId"]}
if self.add_video_info:
item["snippet"].pop("thumbnails")
meta_data.update(item["snippet"])
try:
page_content = self._get_transcripe_for_video_id(
item["id"]["videoId"]
)
video_ids.append(
Document(
page_content=page_content,
metadata=meta_data,
)
)
except (TranscriptsDisabled, NoTranscriptFound) as e:
if self.continue_on_failure:
logger.error(
"Error fetching transscript "
+ f" {item['id']['videoId']}, exception: {e}"
)
else:
raise e
pass
request = self.youtube_client.search().list_next(request, response)
return video_ids
def load(self) -> List[Document]:
"""Load documents."""
document_list = []
if self.channel_name:
document_list.extend(self._get_document_for_channel(self.channel_name))
elif self.video_ids:
document_list.extend(
[
self._get_document_for_video_id(video_id)
for video_id in self.video_ids
]
)
else:
raise ValueError("Must specify either channel_name or video_ids")
return document_list