mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-26 13:27:46 +00:00
93 lines
3.1 KiB
Python
93 lines
3.1 KiB
Python
import asyncio
|
|
import contextvars
|
|
from abc import ABC, abstractmethod
|
|
from concurrent.futures import Executor, ThreadPoolExecutor
|
|
from functools import partial
|
|
from typing import Any, Callable, Optional
|
|
|
|
from dbgpt.component import BaseComponent, ComponentType, SystemApp
|
|
|
|
|
|
class ExecutorFactory(BaseComponent, ABC):
|
|
name = ComponentType.EXECUTOR_DEFAULT.value
|
|
|
|
@abstractmethod
|
|
def create(self) -> "Executor":
|
|
"""Create executor"""
|
|
|
|
|
|
class DefaultExecutorFactory(ExecutorFactory):
|
|
def __init__(self, system_app: SystemApp | None = None, max_workers=None):
|
|
super().__init__(system_app)
|
|
self._executor = ThreadPoolExecutor(
|
|
max_workers=max_workers, thread_name_prefix=self.name
|
|
)
|
|
|
|
def init_app(self, system_app: SystemApp):
|
|
pass
|
|
|
|
def create(self) -> Executor:
|
|
return self._executor
|
|
|
|
|
|
BlockingFunction = Callable[..., Any]
|
|
|
|
|
|
async def blocking_func_to_async(
|
|
executor: Executor, func: BlockingFunction, *args, **kwargs
|
|
):
|
|
"""Run a potentially blocking function within an executor.
|
|
|
|
Args:
|
|
executor (Executor): The concurrent.futures.Executor to run the function within.
|
|
func (ApplyFunction): The callable function, which should be a synchronous function.
|
|
It should accept any number and type of arguments and return an asynchronous coroutine.
|
|
*args (Any): Any additional arguments to pass to the function.
|
|
**kwargs (Any): Other arguments to pass to the function
|
|
|
|
Returns:
|
|
Any: The result of the function's execution.
|
|
|
|
Raises:
|
|
ValueError: If the provided function 'func' is an asynchronous coroutine function.
|
|
|
|
This function allows you to execute a potentially blocking function within an executor.
|
|
It expects 'func' to be a synchronous function and will raise an error if 'func' is an asynchronous coroutine.
|
|
"""
|
|
if asyncio.iscoroutinefunction(func):
|
|
raise ValueError(f"The function {func} is not blocking function")
|
|
|
|
# This function will be called within the new thread, capturing the current context
|
|
ctx = contextvars.copy_context()
|
|
|
|
def run_with_context():
|
|
return ctx.run(partial(func, *args, **kwargs))
|
|
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(executor, run_with_context)
|
|
|
|
|
|
async def blocking_func_to_async_no_executor(func: BlockingFunction, *args, **kwargs):
|
|
"""Run a potentially blocking function within an executor."""
|
|
return await blocking_func_to_async(None, func, *args, **kwargs) # type: ignore
|
|
|
|
|
|
class AsyncToSyncIterator:
|
|
def __init__(self, async_iterable, loop: asyncio.BaseEventLoop):
|
|
self.async_iterable = async_iterable
|
|
self.async_iterator = None
|
|
self._loop = loop
|
|
|
|
def __iter__(self):
|
|
self.async_iterator = self.async_iterable.__aiter__()
|
|
return self
|
|
|
|
def __next__(self):
|
|
if self.async_iterator is None:
|
|
raise StopIteration
|
|
|
|
try:
|
|
return self._loop.run_until_complete(self.async_iterator.__anext__())
|
|
except StopAsyncIteration:
|
|
raise StopIteration
|