core: Add ruff rule S101 (no assert) (#29267)

Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This commit is contained in:
Christophe Bornet
2025-01-20 21:24:31 +01:00
committed by GitHub
parent e5d62c6ce7
commit 989eec4b7b
6 changed files with 90 additions and 49 deletions

View File

@@ -46,8 +46,9 @@ class AsciiCanvas:
TIMEOUT = 10
def __init__(self, cols: int, lines: int) -> None:
assert cols > 1
assert lines > 1
if cols <= 1 or lines <= 1:
msg = "Canvas dimensions should be > 1"
raise ValueError(msg)
self.cols = cols
self.lines = lines
@@ -70,11 +71,15 @@ class AsciiCanvas:
char (str): character to place in the specified point on the
canvas.
"""
assert len(char) == 1
assert x >= 0
assert x < self.cols
assert y >= 0
assert y < self.lines
if len(char) != 1:
msg = "char should be a single character"
raise ValueError(msg)
if x >= self.cols or x < 0:
msg = "x should be >= 0 and < number of columns"
raise ValueError(msg)
if y >= self.lines or y < 0:
msg = "y should be >= 0 and < number of lines"
raise ValueError(msg)
self.canvas[y][x] = char
@@ -130,8 +135,9 @@ class AsciiCanvas:
width (int): box width.
height (int): box height.
"""
assert width > 1
assert height > 1
if width <= 1 or height <= 1:
msg = "Box dimensions should be > 1"
raise ValueError(msg)
width -= 1
height -= 1
@@ -265,7 +271,9 @@ def draw_ascii(vertices: Mapping[str, str], edges: Sequence[LangEdge]) -> str:
# NOTE: first draw edges so that node boxes could overwrite them
for edge in sug.g.sE:
assert len(edge.view._pts) > 1
if len(edge.view._pts) <= 1:
msg = "Not enough points to draw an edge"
raise ValueError(msg)
for index in range(1, len(edge.view._pts)):
start = edge.view._pts[index - 1]
end = edge.view._pts[index]
@@ -275,10 +283,15 @@ def draw_ascii(vertices: Mapping[str, str], edges: Sequence[LangEdge]) -> str:
end_x = int(round(end[0] - minx))
end_y = int(round(end[1] - miny))
assert start_x >= 0
assert start_y >= 0
assert end_x >= 0
assert end_y >= 0
if start_x < 0 or start_y < 0 or end_x < 0 or end_y < 0:
msg = (
"Invalid edge coordinates: "
f"start_x={start_x}, "
f"start_y={start_y}, "
f"end_x={end_x}, "
f"end_y={end_y}"
)
raise ValueError(msg)
canvas.line(start_x, start_y, end_x, end_y, "." if edge.data else "*")

View File

@@ -472,9 +472,9 @@ class RunnableAssign(RunnableSerializable[dict[str, Any], dict[str, Any]]):
config: RunnableConfig,
**kwargs: Any,
) -> dict[str, Any]:
assert isinstance(
input, dict
), "The input to RunnablePassthrough.assign() must be a dict."
if not isinstance(input, dict):
msg = "The input to RunnablePassthrough.assign() must be a dict."
raise ValueError(msg)
return {
**input,
@@ -500,9 +500,9 @@ class RunnableAssign(RunnableSerializable[dict[str, Any], dict[str, Any]]):
config: RunnableConfig,
**kwargs: Any,
) -> dict[str, Any]:
assert isinstance(
input, dict
), "The input to RunnablePassthrough.assign() must be a dict."
if not isinstance(input, dict):
msg = "The input to RunnablePassthrough.assign() must be a dict."
raise ValueError(msg)
return {
**input,
@@ -553,9 +553,9 @@ class RunnableAssign(RunnableSerializable[dict[str, Any], dict[str, Any]]):
)
# consume passthrough stream
for chunk in for_passthrough:
assert isinstance(
chunk, dict
), "The input to RunnablePassthrough.assign() must be a dict."
if not isinstance(chunk, dict):
msg = "The input to RunnablePassthrough.assign() must be a dict."
raise ValueError(msg)
# remove mapper keys from passthrough chunk, to be overwritten by map
filtered = AddableDict(
{k: v for k, v in chunk.items() if k not in mapper_keys}
@@ -603,9 +603,10 @@ class RunnableAssign(RunnableSerializable[dict[str, Any], dict[str, Any]]):
)
# consume passthrough stream
async for chunk in for_passthrough:
assert isinstance(
chunk, dict
), "The input to RunnablePassthrough.assign() must be a dict."
if not isinstance(chunk, dict):
msg = "The input to RunnablePassthrough.assign() must be a dict."
raise ValueError(msg)
# remove mapper keys from passthrough chunk, to be overwritten by map output
filtered = AddableDict(
{k: v for k, v in chunk.items() if k not in mapper_keys}
@@ -705,9 +706,9 @@ class RunnablePick(RunnableSerializable[dict[str, Any], dict[str, Any]]):
return super().get_name(suffix, name=name)
def _pick(self, input: dict[str, Any]) -> Any:
assert isinstance(
input, dict
), "The input to RunnablePassthrough.assign() must be a dict."
if not isinstance(input, dict):
msg = "The input to RunnablePassthrough.assign() must be a dict."
raise ValueError(msg)
if isinstance(self.keys, str):
return input.get(self.keys)