This commit is contained in:
Bagatur 2023-08-17 13:52:09 -07:00
parent 25cbcd9374
commit 8c1a528c71
2 changed files with 3 additions and 42 deletions

View File

@ -653,46 +653,6 @@ class RunnableWithFallbacks(Serializable, Runnable[Input, Output]):
raise first_error
class PutLocalVar(Serializable, Runnable[Input, Input]):
key: Union[str, Dict[str, str]]
def __init__(self, key: str, **kwargs: Any) -> None:
super().__init__(key=key, **kwargs)
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Input:
if config is None:
raise ValueError(
"PutLocalVar should only be used in a RunnableSequence, and should "
"therefore always receive a non-null config."
)
if isinstance(self.key, str):
config["_locals"][self.key] = input
else:
if not isinstance(input, Mapping):
raise ValueError
for get_key, put_key in self.key.items():
config["_locals"][put_key] = input[get_key]
return self._call_with_config(lambda x: x, input, config)
class GetLocalVar(Serializable, Runnable[str, Any]):
key: str
passthrough_key: Optional[str] = None
def __init__(self, key: str, **kwargs: Any) -> None:
super().__init__(key=key, **kwargs)
def invoke(self, input: str, config: Optional[RunnableConfig] = None) -> Any:
if config is None:
raise ValueError(
"PutLocalVar should only be used in a RunnableSequence, and should "
"therefore always receive a non-null config."
)
if self.passthrough_key is not None:
return {self.key: config["_locals"][self.key], self.passthrough_key: input}
return config["_locals"][self.key]
class RunnableSequence(Serializable, Runnable[Input, Output]):
"""
A sequence of runnables, where the output of each is the input of the next.

View File

@ -41,7 +41,8 @@ class RunnablePassthrough(Serializable, Runnable[Input, Input]):
) -> Iterator[Input]:
return self._transform_stream_with_config(input, identity, config)
def atransform(
async def atransform(
self, input: AsyncIterator[Input], config: RunnableConfig | None = None
) -> AsyncIterator[Input]:
return self._atransform_stream_with_config(input, identity, config)
async for chunk in self._atransform_stream_with_config(input, identity, config):
yield chunk