mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-28 06:17:14 +00:00
145 lines
4.4 KiB
Python
145 lines
4.4 KiB
Python
"""Cache manager."""
|
|
|
|
import logging
|
|
from abc import ABC, abstractmethod
|
|
from concurrent.futures import Executor
|
|
from typing import Optional, Type, cast
|
|
|
|
from dbgpt.component import BaseComponent, ComponentType, SystemApp
|
|
from dbgpt.core import CacheConfig, CacheKey, CacheValue, Serializable, Serializer
|
|
from dbgpt.core.interface.cache import K, V
|
|
from dbgpt.util.executor_utils import ExecutorFactory, blocking_func_to_async
|
|
|
|
from .storage.base import CacheStorage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CacheManager(BaseComponent, ABC):
|
|
"""The cache manager interface."""
|
|
|
|
name = ComponentType.MODEL_CACHE_MANAGER
|
|
|
|
def __init__(self, system_app: SystemApp | None = None):
|
|
"""Create cache manager."""
|
|
super().__init__(system_app)
|
|
|
|
def init_app(self, system_app: SystemApp):
|
|
"""Initialize cache manager."""
|
|
self.system_app = system_app
|
|
|
|
@abstractmethod
|
|
async def set(
|
|
self,
|
|
key: CacheKey[K],
|
|
value: CacheValue[V],
|
|
cache_config: Optional[CacheConfig] = None,
|
|
):
|
|
"""Set cache with key."""
|
|
|
|
@abstractmethod
|
|
async def get(
|
|
self,
|
|
key: CacheKey[K],
|
|
cls: Type[Serializable],
|
|
cache_config: Optional[CacheConfig] = None,
|
|
) -> Optional[CacheValue[V]]:
|
|
"""Retrieve cache with key."""
|
|
|
|
@property
|
|
@abstractmethod
|
|
def serializer(self) -> Serializer:
|
|
"""Return serializer to serialize/deserialize cache value."""
|
|
|
|
|
|
class LocalCacheManager(CacheManager):
|
|
"""Local cache manager."""
|
|
|
|
def __init__(
|
|
self, system_app: SystemApp, serializer: Serializer, storage: CacheStorage
|
|
) -> None:
|
|
"""Create local cache manager."""
|
|
super().__init__(system_app)
|
|
self._serializer = serializer
|
|
self._storage = storage
|
|
|
|
@property
|
|
def executor(self) -> Executor:
|
|
"""Return executor."""
|
|
return self.system_app.get_component( # type: ignore
|
|
ComponentType.EXECUTOR_DEFAULT, ExecutorFactory
|
|
).create()
|
|
|
|
async def set(
|
|
self,
|
|
key: CacheKey[K],
|
|
value: CacheValue[V],
|
|
cache_config: Optional[CacheConfig] = None,
|
|
):
|
|
"""Set cache with key."""
|
|
if self._storage.support_async():
|
|
await self._storage.aset(key, value, cache_config)
|
|
else:
|
|
await blocking_func_to_async(
|
|
self.executor, self._storage.set, key, value, cache_config
|
|
)
|
|
|
|
async def get(
|
|
self,
|
|
key: CacheKey[K],
|
|
cls: Type[Serializable],
|
|
cache_config: Optional[CacheConfig] = None,
|
|
) -> Optional[CacheValue[V]]:
|
|
"""Retrieve cache with key."""
|
|
if self._storage.support_async():
|
|
item_bytes = await self._storage.aget(key, cache_config)
|
|
else:
|
|
item_bytes = await blocking_func_to_async(
|
|
self.executor, self._storage.get, key, cache_config
|
|
)
|
|
if not item_bytes:
|
|
return None
|
|
return cast(
|
|
CacheValue[V], self._serializer.deserialize(item_bytes.value_data, cls)
|
|
)
|
|
|
|
@property
|
|
def serializer(self) -> Serializer:
|
|
"""Return serializer to serialize/deserialize cache value."""
|
|
return self._serializer
|
|
|
|
|
|
def initialize_cache(
|
|
system_app: SystemApp, storage_type: str, max_memory_mb: int, persist_dir: str
|
|
):
|
|
"""Initialize cache manager.
|
|
|
|
Args:
|
|
system_app (SystemApp): The system app.
|
|
storage_type (str): The storage type.
|
|
max_memory_mb (int): The max memory in MB.
|
|
persist_dir (str): The persist directory.
|
|
"""
|
|
from dbgpt.util.serialization.json_serialization import JsonSerializer
|
|
|
|
from .storage.base import MemoryCacheStorage
|
|
|
|
if storage_type == "disk":
|
|
try:
|
|
from .storage.disk.disk_storage import DiskCacheStorage
|
|
|
|
cache_storage: CacheStorage = DiskCacheStorage(
|
|
persist_dir, mem_table_buffer_mb=max_memory_mb
|
|
)
|
|
except ImportError as e:
|
|
logger.warn(
|
|
f"Can't import DiskCacheStorage, use MemoryCacheStorage, import error "
|
|
f"message: {str(e)}"
|
|
)
|
|
cache_storage = MemoryCacheStorage(max_memory_mb=max_memory_mb)
|
|
else:
|
|
cache_storage = MemoryCacheStorage(max_memory_mb=max_memory_mb)
|
|
system_app.register(
|
|
LocalCacheManager, serializer=JsonSerializer(), storage=cache_storage
|
|
)
|