diff --git a/libs/community/langchain_community/document_loaders/async_html.py b/libs/community/langchain_community/document_loaders/async_html.py index 9d23a12abc0..832ef114446 100644 --- a/libs/community/langchain_community/document_loaders/async_html.py +++ b/libs/community/langchain_community/document_loaders/async_html.py @@ -1,8 +1,18 @@ import asyncio import logging import warnings -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Dict, Iterator, List, Optional, Union, cast +from concurrent.futures import Future, ThreadPoolExecutor +from typing import ( + Any, + AsyncIterator, + Dict, + Iterator, + List, + Optional, + Tuple, + Union, + cast, +) import aiohttp import requests @@ -52,6 +62,8 @@ class AsyncHtmlLoader(BaseLoader): requests_kwargs: Optional[Dict[str, Any]] = None, raise_for_status: bool = False, ignore_load_errors: bool = False, + *, + preserve_order: bool = True, ): """Initialize with a webpage path.""" @@ -90,6 +102,7 @@ class AsyncHtmlLoader(BaseLoader): self.autoset_encoding = autoset_encoding self.encoding = encoding self.ignore_load_errors = ignore_load_errors + self.preserve_order = preserve_order def _fetch_valid_connection_docs(self, url: str) -> Any: if self.ignore_load_errors: @@ -110,35 +123,6 @@ class AsyncHtmlLoader(BaseLoader): "`parser` must be one of " + ", ".join(valid_parsers) + "." ) - def _scrape( - self, - url: str, - parser: Union[str, None] = None, - bs_kwargs: Optional[dict] = None, - ) -> Any: - from bs4 import BeautifulSoup - - if parser is None: - if url.endswith(".xml"): - parser = "xml" - else: - parser = self.default_parser - - self._check_parser(parser) - - html_doc = self._fetch_valid_connection_docs(url) - if not getattr(html_doc, "ok", False): - return None - - if self.raise_for_status: - html_doc.raise_for_status() - - if self.encoding is not None: - html_doc.encoding = self.encoding - elif self.autoset_encoding: - html_doc.encoding = html_doc.apparent_encoding - return BeautifulSoup(html_doc.text, parser, **(bs_kwargs or {})) - async def _fetch( self, url: str, retries: int = 3, cooldown: int = 2, backoff: float = 1.5 ) -> str: @@ -172,51 +156,79 @@ class AsyncHtmlLoader(BaseLoader): async def _fetch_with_rate_limit( self, url: str, semaphore: asyncio.Semaphore - ) -> str: + ) -> Tuple[str, str]: async with semaphore: - return await self._fetch(url) + return url, await self._fetch(url) - async def fetch_all(self, urls: List[str]) -> Any: - """Fetch all urls concurrently with rate limiting.""" + async def _lazy_fetch_all( + self, urls: List[str], preserve_order: bool + ) -> AsyncIterator[Tuple[str, str]]: semaphore = asyncio.Semaphore(self.requests_per_second) - tasks = [] - for url in urls: - task = asyncio.ensure_future(self._fetch_with_rate_limit(url, semaphore)) - tasks.append(task) + tasks = [ + asyncio.create_task(self._fetch_with_rate_limit(url, semaphore)) + for url in urls + ] try: from tqdm.asyncio import tqdm_asyncio - return await tqdm_asyncio.gather( - *tasks, desc="Fetching pages", ascii=True, mininterval=1 - ) + if preserve_order: + for task in tqdm_asyncio( + tasks, desc="Fetching pages", ascii=True, mininterval=1 + ): + yield await task + else: + for task in tqdm_asyncio.as_completed( + tasks, desc="Fetching pages", ascii=True, mininterval=1 + ): + yield await task except ImportError: warnings.warn("For better logging of progress, `pip install tqdm`") - return await asyncio.gather(*tasks) + if preserve_order: + for result in await asyncio.gather(*tasks): + yield result + else: + for task in asyncio.as_completed(tasks): + yield await task + + async def fetch_all(self, urls: List[str]) -> List[str]: + """Fetch all urls concurrently with rate limiting.""" + return [doc async for _, doc in self._lazy_fetch_all(urls, True)] + + def _to_document(self, url: str, text: str) -> Document: + from bs4 import BeautifulSoup + + if url.endswith(".xml"): + parser = "xml" + else: + parser = self.default_parser + self._check_parser(parser) + soup = BeautifulSoup(text, parser) + metadata = _build_metadata(soup, url) + return Document(page_content=text, metadata=metadata) def lazy_load(self) -> Iterator[Document]: """Lazy load text from the url(s) in web_path.""" - for doc in self.load(): - yield doc - - def load(self) -> List[Document]: - """Load text from the url(s) in web_path.""" - + results: List[str] try: # Raises RuntimeError if there is no current event loop. asyncio.get_running_loop() # If there is a current event loop, we need to run the async code # in a separate loop, in a separate thread. with ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit(asyncio.run, self.fetch_all(self.web_paths)) + future: Future[List[str]] = executor.submit( + asyncio.run, # type: ignore[arg-type] + self.fetch_all(self.web_paths), # type: ignore[arg-type] + ) results = future.result() except RuntimeError: results = asyncio.run(self.fetch_all(self.web_paths)) - docs = [] - for i, text in enumerate(cast(List[str], results)): - soup = self._scrape(self.web_paths[i]) - if not soup: - continue - metadata = _build_metadata(soup, self.web_paths[i]) - docs.append(Document(page_content=text, metadata=metadata)) - return docs + for i, text in enumerate(cast(List[str], results)): + yield self._to_document(self.web_paths[i], text) + + async def alazy_load(self) -> AsyncIterator[Document]: + """Lazy load text from the url(s) in web_path.""" + async for url, text in self._lazy_fetch_all( + self.web_paths, self.preserve_order + ): + yield self._to_document(url, text)