mirror of
https://github.com/hwchase17/langchain.git
synced 2025-08-20 09:57:32 +00:00
**Description:** Fixes a bug in the YoutubeLoader where FetchedTranscript objects were not properly processed. The loader was only extracting the 'text' attribute from FetchedTranscriptSnippet objects while ignoring 'start' and 'duration' attributes. This would cause a TypeError when the code later tried to access these missing keys, particularly when using the CHUNKS format or any code path that needed timestamp information. This PR modifies the conversion of FetchedTranscriptSnippet objects to include all necessary attributes, ensuring that the loader works correctly with all transcript formats. **Issue:** Fixes #30309 **Dependencies:** None **Testing:** - Tested the fix with multiple YouTube videos to confirm it resolves the issue - Verified that both regular loading and CHUNKS format work correctly
528 lines
18 KiB
Python
528 lines
18 KiB
Python
"""Loads YouTube transcript."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Generator, List, Optional, Sequence, Union
|
|
from urllib.parse import parse_qs, urlparse
|
|
from xml.etree.ElementTree import ParseError # OK: trusted-source
|
|
|
|
from langchain_core.documents import Document
|
|
from pydantic import model_validator
|
|
from pydantic.dataclasses import dataclass
|
|
|
|
from langchain_community.document_loaders.base import BaseLoader
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SCOPES = ["https://www.googleapis.com/auth/youtube.readonly"]
|
|
|
|
|
|
@dataclass
|
|
class GoogleApiClient:
|
|
"""Generic Google API Client.
|
|
|
|
To use, you should have the ``google_auth_oauthlib,youtube_transcript_api,google``
|
|
python package installed.
|
|
As the google api expects credentials you need to set up a google account and
|
|
register your Service. "https://developers.google.com/docs/api/quickstart/python"
|
|
|
|
*Security Note*: Note that parsing of the transcripts relies on the standard
|
|
xml library but the input is viewed as trusted in this case.
|
|
|
|
|
|
Example:
|
|
.. code-block:: python
|
|
|
|
from langchain_community.document_loaders import GoogleApiClient
|
|
google_api_client = GoogleApiClient(
|
|
service_account_path=Path("path_to_your_sec_file.json")
|
|
)
|
|
|
|
"""
|
|
|
|
credentials_path: Path = Path.home() / ".credentials" / "credentials.json"
|
|
service_account_path: Path = Path.home() / ".credentials" / "credentials.json"
|
|
token_path: Path = Path.home() / ".credentials" / "token.json"
|
|
|
|
def __post_init__(self) -> None:
|
|
self.creds = self._load_credentials()
|
|
|
|
@model_validator(mode="before")
|
|
@classmethod
|
|
def validate_channel_or_videoIds_is_set(cls, values: Any) -> Any:
|
|
"""Validate that either folder_id or document_ids is set, but not both."""
|
|
|
|
if not values.kwargs.get("credentials_path") and not values.kwargs.get(
|
|
"service_account_path"
|
|
):
|
|
raise ValueError("Must specify either channel_name or video_ids")
|
|
return values.kwargs
|
|
|
|
def _load_credentials(self) -> Any:
|
|
"""Load credentials."""
|
|
# Adapted from https://developers.google.com/drive/api/v3/quickstart/python
|
|
try:
|
|
from google.auth.transport.requests import Request
|
|
from google.oauth2 import service_account
|
|
from google.oauth2.credentials import Credentials
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401
|
|
except ImportError:
|
|
raise ImportError(
|
|
"You must run"
|
|
"`pip install --upgrade "
|
|
"google-api-python-client google-auth-httplib2 "
|
|
"google-auth-oauthlib "
|
|
"youtube-transcript-api` "
|
|
"to use the Google Drive loader"
|
|
)
|
|
|
|
creds = None
|
|
if self.service_account_path.exists():
|
|
return service_account.Credentials.from_service_account_file(
|
|
str(self.service_account_path)
|
|
)
|
|
if self.token_path.exists():
|
|
creds = Credentials.from_authorized_user_file(str(self.token_path), SCOPES)
|
|
|
|
if not creds or not creds.valid:
|
|
if creds and creds.expired and creds.refresh_token:
|
|
creds.refresh(Request())
|
|
else:
|
|
flow = InstalledAppFlow.from_client_secrets_file(
|
|
str(self.credentials_path), SCOPES
|
|
)
|
|
creds = flow.run_local_server(port=0)
|
|
with open(self.token_path, "w") as token:
|
|
token.write(creds.to_json())
|
|
|
|
return creds
|
|
|
|
|
|
ALLOWED_SCHEMES = {"http", "https"}
|
|
ALLOWED_NETLOCS = {
|
|
"youtu.be",
|
|
"m.youtube.com",
|
|
"youtube.com",
|
|
"www.youtube.com",
|
|
"www.youtube-nocookie.com",
|
|
"vid.plus",
|
|
}
|
|
|
|
|
|
def _parse_video_id(url: str) -> Optional[str]:
|
|
"""Parse a YouTube URL and return the video ID if valid, otherwise None."""
|
|
parsed_url = urlparse(url)
|
|
|
|
if parsed_url.scheme not in ALLOWED_SCHEMES:
|
|
return None
|
|
|
|
if parsed_url.netloc not in ALLOWED_NETLOCS:
|
|
return None
|
|
|
|
path = parsed_url.path
|
|
|
|
if path.endswith("/watch"):
|
|
query = parsed_url.query
|
|
parsed_query = parse_qs(query)
|
|
if "v" in parsed_query:
|
|
ids = parsed_query["v"]
|
|
video_id = ids if isinstance(ids, str) else ids[0]
|
|
else:
|
|
return None
|
|
else:
|
|
path = parsed_url.path.lstrip("/")
|
|
video_id = path.split("/")[-1]
|
|
|
|
if len(video_id) != 11: # Video IDs are 11 characters long
|
|
return None
|
|
|
|
return video_id
|
|
|
|
|
|
class TranscriptFormat(Enum):
|
|
"""Output formats of transcripts from `YoutubeLoader`."""
|
|
|
|
TEXT = "text"
|
|
LINES = "lines"
|
|
CHUNKS = "chunks"
|
|
|
|
|
|
class YoutubeLoader(BaseLoader):
|
|
"""Load `YouTube` video transcripts."""
|
|
|
|
def __init__(
|
|
self,
|
|
video_id: str,
|
|
add_video_info: bool = False,
|
|
language: Union[str, Sequence[str]] = "en",
|
|
translation: Optional[str] = None,
|
|
transcript_format: TranscriptFormat = TranscriptFormat.TEXT,
|
|
continue_on_failure: bool = False,
|
|
chunk_size_seconds: int = 120,
|
|
):
|
|
"""Initialize with YouTube video ID."""
|
|
self.video_id = video_id
|
|
self._metadata = {"source": video_id}
|
|
self.add_video_info = add_video_info
|
|
self.language = language
|
|
if isinstance(language, str):
|
|
self.language = [language]
|
|
else:
|
|
self.language = language
|
|
self.translation = translation
|
|
self.transcript_format = transcript_format
|
|
self.continue_on_failure = continue_on_failure
|
|
self.chunk_size_seconds = chunk_size_seconds
|
|
|
|
@staticmethod
|
|
def extract_video_id(youtube_url: str) -> str:
|
|
"""Extract video ID from common YouTube URLs."""
|
|
video_id = _parse_video_id(youtube_url)
|
|
if not video_id:
|
|
raise ValueError(
|
|
f'Could not determine the video ID for the URL "{youtube_url}".'
|
|
)
|
|
return video_id
|
|
|
|
@classmethod
|
|
def from_youtube_url(cls, youtube_url: str, **kwargs: Any) -> YoutubeLoader:
|
|
"""Given a YouTube URL, construct a loader.
|
|
See `YoutubeLoader()` constructor for a list of keyword arguments.
|
|
"""
|
|
video_id = cls.extract_video_id(youtube_url)
|
|
return cls(video_id, **kwargs)
|
|
|
|
def _make_chunk_document(
|
|
self, chunk_pieces: List[Dict], chunk_start_seconds: int
|
|
) -> Document:
|
|
"""Create Document from chunk of transcript pieces."""
|
|
m, s = divmod(chunk_start_seconds, 60)
|
|
h, m = divmod(m, 60)
|
|
return Document(
|
|
page_content=" ".join(
|
|
map(lambda chunk_piece: chunk_piece["text"].strip(" "), chunk_pieces)
|
|
),
|
|
metadata={
|
|
**self._metadata,
|
|
"start_seconds": chunk_start_seconds,
|
|
"start_timestamp": f"{h:02d}:{m:02d}:{s:02d}",
|
|
"source":
|
|
# replace video ID with URL to start time
|
|
f"https://www.youtube.com/watch?v={self.video_id}"
|
|
f"&t={chunk_start_seconds}s",
|
|
},
|
|
)
|
|
|
|
def _get_transcript_chunks(
|
|
self, transcript_pieces: List[Dict]
|
|
) -> Generator[Document, None, None]:
|
|
chunk_pieces: List[Dict[str, Any]] = []
|
|
chunk_start_seconds = 0
|
|
chunk_time_limit = self.chunk_size_seconds
|
|
for transcript_piece in transcript_pieces:
|
|
piece_end = transcript_piece["start"] + transcript_piece["duration"]
|
|
if piece_end > chunk_time_limit:
|
|
if chunk_pieces:
|
|
yield self._make_chunk_document(chunk_pieces, chunk_start_seconds)
|
|
chunk_pieces = []
|
|
chunk_start_seconds = chunk_time_limit
|
|
chunk_time_limit += self.chunk_size_seconds
|
|
|
|
chunk_pieces.append(transcript_piece)
|
|
|
|
if len(chunk_pieces) > 0:
|
|
yield self._make_chunk_document(chunk_pieces, chunk_start_seconds)
|
|
|
|
def load(self) -> List[Document]:
|
|
"""Load YouTube transcripts into `Document` objects."""
|
|
try:
|
|
from youtube_transcript_api import (
|
|
FetchedTranscript,
|
|
NoTranscriptFound,
|
|
TranscriptsDisabled,
|
|
YouTubeTranscriptApi,
|
|
)
|
|
except ImportError:
|
|
raise ImportError(
|
|
'Could not import "youtube_transcript_api" Python package. '
|
|
"Please install it with `pip install youtube-transcript-api`."
|
|
)
|
|
|
|
if self.add_video_info:
|
|
# Get more video meta info
|
|
# Such as title, description, thumbnail url, publish_date
|
|
video_info = self._get_video_info()
|
|
self._metadata.update(video_info)
|
|
|
|
try:
|
|
transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id)
|
|
except TranscriptsDisabled:
|
|
return []
|
|
|
|
try:
|
|
transcript = transcript_list.find_transcript(self.language)
|
|
except NoTranscriptFound:
|
|
transcript = transcript_list.find_transcript(["en"])
|
|
|
|
if self.translation is not None:
|
|
transcript = transcript.translate(self.translation)
|
|
transcript_object = transcript.fetch()
|
|
if isinstance(transcript_object, FetchedTranscript):
|
|
transcript_pieces = [
|
|
{
|
|
"text": snippet.text,
|
|
"start": snippet.start,
|
|
"duration": snippet.duration,
|
|
}
|
|
for snippet in transcript_object.snippets
|
|
]
|
|
else:
|
|
transcript_pieces: List[Dict[str, Any]] = transcript_object # type: ignore[no-redef]
|
|
|
|
if self.transcript_format == TranscriptFormat.TEXT:
|
|
transcript = " ".join(
|
|
map(
|
|
lambda transcript_piece: transcript_piece["text"].strip(" "),
|
|
transcript_pieces,
|
|
)
|
|
)
|
|
return [Document(page_content=transcript, metadata=self._metadata)]
|
|
elif self.transcript_format == TranscriptFormat.LINES:
|
|
return list(
|
|
map(
|
|
lambda transcript_piece: Document(
|
|
page_content=transcript_piece["text"].strip(" "),
|
|
metadata=dict(
|
|
filter(
|
|
lambda item: item[0] != "text", transcript_piece.items()
|
|
)
|
|
),
|
|
),
|
|
transcript_pieces,
|
|
)
|
|
)
|
|
elif self.transcript_format == TranscriptFormat.CHUNKS:
|
|
return list(self._get_transcript_chunks(transcript_pieces))
|
|
|
|
else:
|
|
raise ValueError("Unknown transcript format.")
|
|
|
|
def _get_video_info(self) -> Dict:
|
|
"""Get important video information.
|
|
|
|
Components include:
|
|
- title
|
|
- description
|
|
- thumbnail URL,
|
|
- publish_date
|
|
- channel author
|
|
- and more.
|
|
"""
|
|
try:
|
|
from pytube import YouTube
|
|
|
|
except ImportError:
|
|
raise ImportError(
|
|
'Could not import "pytube" Python package. '
|
|
"Please install it with `pip install pytube`."
|
|
)
|
|
yt = YouTube(f"https://www.youtube.com/watch?v={self.video_id}")
|
|
video_info = {
|
|
"title": yt.title or "Unknown",
|
|
"description": yt.description or "Unknown",
|
|
"view_count": yt.views or 0,
|
|
"thumbnail_url": yt.thumbnail_url or "Unknown",
|
|
"publish_date": yt.publish_date.strftime("%Y-%m-%d %H:%M:%S")
|
|
if yt.publish_date
|
|
else "Unknown",
|
|
"length": yt.length or 0,
|
|
"author": yt.author or "Unknown",
|
|
}
|
|
return video_info
|
|
|
|
|
|
@dataclass
|
|
class GoogleApiYoutubeLoader(BaseLoader):
|
|
"""Load all Videos from a `YouTube` Channel.
|
|
|
|
To use, you should have the ``googleapiclient,youtube_transcript_api``
|
|
python package installed.
|
|
As the service needs a google_api_client, you first have to initialize
|
|
the GoogleApiClient.
|
|
|
|
Additionally you have to either provide a channel name or a list of videoids
|
|
"https://developers.google.com/docs/api/quickstart/python"
|
|
|
|
|
|
|
|
Example:
|
|
.. code-block:: python
|
|
|
|
from langchain_community.document_loaders import GoogleApiClient
|
|
from langchain_community.document_loaders import GoogleApiYoutubeLoader
|
|
google_api_client = GoogleApiClient(
|
|
service_account_path=Path("path_to_your_sec_file.json")
|
|
)
|
|
loader = GoogleApiYoutubeLoader(
|
|
google_api_client=google_api_client,
|
|
channel_name = "CodeAesthetic"
|
|
)
|
|
load.load()
|
|
|
|
"""
|
|
|
|
google_api_client: GoogleApiClient
|
|
channel_name: Optional[str] = None
|
|
video_ids: Optional[List[str]] = None
|
|
add_video_info: bool = True
|
|
captions_language: str = "en"
|
|
continue_on_failure: bool = False
|
|
|
|
def __post_init__(self) -> None:
|
|
self.youtube_client = self._build_youtube_client(self.google_api_client.creds)
|
|
|
|
def _build_youtube_client(self, creds: Any) -> Any:
|
|
try:
|
|
from googleapiclient.discovery import build
|
|
from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401
|
|
except ImportError:
|
|
raise ImportError(
|
|
"You must run"
|
|
"`pip install --upgrade "
|
|
"google-api-python-client google-auth-httplib2 "
|
|
"google-auth-oauthlib "
|
|
"youtube-transcript-api` "
|
|
"to use the Google Drive loader"
|
|
)
|
|
|
|
return build("youtube", "v3", credentials=creds)
|
|
|
|
@model_validator(mode="before")
|
|
@classmethod
|
|
def validate_channel_or_videoIds_is_set(cls, values: Any) -> Any:
|
|
"""Validate that either folder_id or document_ids is set, but not both."""
|
|
if not values.kwargs.get("channel_name") and not values.kwargs.get("video_ids"):
|
|
raise ValueError("Must specify either channel_name or video_ids")
|
|
return values.kwargs
|
|
|
|
def _get_transcripe_for_video_id(self, video_id: str) -> str:
|
|
from youtube_transcript_api import NoTranscriptFound, YouTubeTranscriptApi
|
|
|
|
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
|
|
try:
|
|
transcript = transcript_list.find_transcript([self.captions_language])
|
|
except NoTranscriptFound:
|
|
for available_transcript in transcript_list:
|
|
transcript = available_transcript.translate(self.captions_language)
|
|
continue
|
|
|
|
transcript_pieces = transcript.fetch()
|
|
return " ".join([t["text"].strip(" ") for t in transcript_pieces])
|
|
|
|
def _get_document_for_video_id(self, video_id: str, **kwargs: Any) -> Document:
|
|
captions = self._get_transcripe_for_video_id(video_id)
|
|
video_response = (
|
|
self.youtube_client.videos()
|
|
.list(
|
|
part="id,snippet",
|
|
id=video_id,
|
|
)
|
|
.execute()
|
|
)
|
|
return Document(
|
|
page_content=captions,
|
|
metadata=video_response.get("items")[0],
|
|
)
|
|
|
|
def _get_channel_id(self, channel_name: str) -> str:
|
|
request = self.youtube_client.search().list(
|
|
part="id",
|
|
q=channel_name,
|
|
type="channel",
|
|
maxResults=1, # we only need one result since channel names are unique
|
|
)
|
|
response = request.execute()
|
|
channel_id = response["items"][0]["id"]["channelId"]
|
|
return channel_id
|
|
|
|
def _get_uploads_playlist_id(self, channel_id: str) -> str:
|
|
request = self.youtube_client.channels().list(
|
|
part="contentDetails",
|
|
id=channel_id,
|
|
)
|
|
response = request.execute()
|
|
return response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]
|
|
|
|
def _get_document_for_channel(self, channel: str, **kwargs: Any) -> List[Document]:
|
|
try:
|
|
from youtube_transcript_api import (
|
|
NoTranscriptFound,
|
|
TranscriptsDisabled,
|
|
)
|
|
except ImportError:
|
|
raise ImportError(
|
|
"You must run"
|
|
"`pip install --upgrade "
|
|
"youtube-transcript-api` "
|
|
"to use the youtube loader"
|
|
)
|
|
|
|
channel_id = self._get_channel_id(channel)
|
|
uploads_playlist_id = self._get_uploads_playlist_id(channel_id)
|
|
request = self.youtube_client.playlistItems().list(
|
|
part="id,snippet",
|
|
playlistId=uploads_playlist_id,
|
|
maxResults=50,
|
|
)
|
|
video_ids = []
|
|
while request is not None:
|
|
response = request.execute()
|
|
|
|
# Add each video ID to the list
|
|
for item in response["items"]:
|
|
video_id = item["snippet"]["resourceId"]["videoId"]
|
|
meta_data = {"videoId": video_id}
|
|
if self.add_video_info:
|
|
item["snippet"].pop("thumbnails")
|
|
meta_data.update(item["snippet"])
|
|
try:
|
|
page_content = self._get_transcripe_for_video_id(video_id)
|
|
video_ids.append(
|
|
Document(
|
|
page_content=page_content,
|
|
metadata=meta_data,
|
|
)
|
|
)
|
|
except (TranscriptsDisabled, NoTranscriptFound, ParseError) as e:
|
|
if self.continue_on_failure:
|
|
logger.error(
|
|
"Error fetching transscript "
|
|
+ f" {item['id']['videoId']}, exception: {e}"
|
|
)
|
|
else:
|
|
raise e
|
|
pass
|
|
request = self.youtube_client.search().list_next(request, response)
|
|
|
|
return video_ids
|
|
|
|
def load(self) -> List[Document]:
|
|
"""Load documents."""
|
|
document_list = []
|
|
if self.channel_name:
|
|
document_list.extend(self._get_document_for_channel(self.channel_name))
|
|
elif self.video_ids:
|
|
document_list.extend(
|
|
[
|
|
self._get_document_for_video_id(video_id)
|
|
for video_id in self.video_ids
|
|
]
|
|
)
|
|
else:
|
|
raise ValueError("Must specify either channel_name or video_ids")
|
|
return document_list
|