Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions marimo/_ast/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ def __init__(self, **kwargs: Any) -> None:
self._cell_manager = CellManager(prefix=cell_prefix)
self._graph = dataflow.DirectedGraph()
self._execution_context: ExecutionContext | None = None
self._runner = dataflow.Runner(self._graph)
self._header: str | None = None

self._unparsable_code: list[str] = []
Expand Down Expand Up @@ -780,17 +779,23 @@ def process_data(pd, batch_size, learning_rate):
async def _run_cell_async(
self, cell: Cell, kwargs: dict[str, Any]
) -> tuple[Any, _Namespace]:
from marimo._runtime.runner import by_kwargs

self._maybe_initialize()
output, defs = await self._runner.run_cell_async(
cell._cell.cell_id, kwargs
output, defs = await by_kwargs.run_cell_async(
self._graph, cell._cell.cell_id, kwargs
)
return output, _Namespace(defs, owner=self)

def _run_cell_sync(
self, cell: Cell, kwargs: dict[str, Any]
) -> tuple[Any, _Namespace]:
from marimo._runtime.runner import by_kwargs

self._maybe_initialize()
output, defs = self._runner.run_cell_sync(cell._cell.cell_id, kwargs)
output, defs = by_kwargs.run_cell_sync(
Comment thread
dmadisetti marked this conversation as resolved.
self._graph, cell._cell.cell_id, kwargs
)
return output, _Namespace(defs, owner=self)

async def _set_ui_element_value(
Expand Down Expand Up @@ -1000,11 +1005,6 @@ def set_execution_context(
) -> None:
self._app._execution_context = execution_context

@property
def runner(self) -> dataflow.Runner:
self._app._maybe_initialize()
return self._app._runner

def update_config(self, updates: dict[str, Any]) -> _AppConfig:
return self.config.update(updates)

Expand Down
16 changes: 13 additions & 3 deletions marimo/_ast/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,11 @@ def _is_coroutine(self) -> bool:
if hasattr(self, "_is_coro_cached"):
return self._is_coro_cached
assert self._app is not None
self._is_coro_cached: bool = self._app.runner.is_coroutine(
self._cell.cell_id
from marimo._runtime.runner import by_kwargs

# ``graph`` triggers _maybe_initialize on the underlying App.
self._is_coro_cached: bool = by_kwargs.is_coroutine(
self._app.graph, self._cell.cell_id
)
return self._is_coro_cached

Expand Down Expand Up @@ -655,8 +658,15 @@ def add(mo, x, y):
}
refs = {**from_setup, **refs}

from marimo._runtime.runner import by_kwargs

try:
if self._is_coroutine:
# Refresh the async decision with the caller's substitutions —
# an unsubstituted ancestor may have been async but isn't on
# this call's ancestor closure.
if by_kwargs.is_coroutine(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer cached. intentional?

self._app.graph, self._cell.cell_id, refs
):
return self._app.run_cell_async(cell=self, kwargs=refs)
else:
return self._app.run_cell_sync(cell=self, kwargs=refs)
Expand Down
31 changes: 3 additions & 28 deletions marimo/_messaging/tracebacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ def write_traceback(traceback: str) -> None:
# In run mode, only forward to the frontend if show_tracebacks is on.
if in_run_mode and not _show_tracebacks_enabled():
return
# Strip marimo's internal executor.py frame and highlight for the UI
trimmed = _trim_traceback(traceback)
sys.stderr._write_with_mimetype(
_highlight_traceback(trimmed),
_highlight_traceback(traceback),
mimetype="application/vnd.marimo+traceback",
)
else:
Expand All @@ -64,16 +62,15 @@ def write_traceback(traceback: str) -> None:
if in_run_mode and not _show_tracebacks_enabled():
sys.stderr.write(traceback)
return
trimmed = _trim_traceback(traceback)
broadcast_notification(
CellNotification(
cell_id=ctx.cell_id,
console=CellOutput(
channel=CellChannel.STDERR,
mimetype="application/vnd.marimo+traceback",
data=trimmed
data=traceback
if code_mode
else _highlight_traceback(trimmed),
else _highlight_traceback(traceback),
),
),
ctx.stream,
Expand All @@ -83,27 +80,5 @@ def write_traceback(traceback: str) -> None:
sys.stderr.write(traceback)


def _trim_traceback(traceback: str) -> str:
"""
Skip first DefaultExecutor.execute_cell traceback item which all traces start with.
"""

lines = traceback.split("\n")
if (
len(lines) > 2
and lines[0] == "Traceback (most recent call last):"
and (
'/marimo/_runtime/executor.py", line ' in lines[1]
or '\\marimo\\_runtime\\executor.py", line ' in lines[1]
)
and lines[1].endswith(", in execute_cell")
):
for i in range(2, len(lines)):
if lines[i].startswith(" File "):
return "\n".join(lines[:1] + lines[i:])

return traceback


def is_code_highlighting(value: str) -> bool:
return 'class="codehilite"' in value
155 changes: 68 additions & 87 deletions marimo/_runtime/app/script_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from __future__ import annotations

import asyncio
from collections import deque
from typing import TYPE_CHECKING, Any

from marimo._ast.names import SETUP_CELL_NAME
Expand All @@ -21,12 +20,17 @@
MarimoRuntimeException,
unwrap_user_exception,
)
from marimo._runtime.executor import resolve_executor
from marimo._runtime.executor import (
Evaluator,
resolve_executor,
)
from marimo._runtime.patches import (
create_main_module,
extract_docstring_from_header,
patch_main_module_context,
)
from marimo._runtime.runner.result import RunResult
from marimo._runtime.runner.scheduler import SequentialScheduler
from marimo._types.ids import CellId_t

if TYPE_CHECKING:
Expand All @@ -47,7 +51,6 @@ def __init__(
self.app = app
self.filename = filename
self._docstring = extract_docstring_from_header(app._app._header)
self.cells_cancelled: set[CellId_t] = set()
self._glbls = glbls if glbls else {}

# Setup cell cannot be overridden, and it's possible that some
Expand All @@ -59,24 +62,21 @@ def __init__(
excluded=CellId_t(SETUP_CELL_NAME),
)

self.cells_to_run: deque[CellId_t] = deque(
cells_to_run = [
cid
for cid in pruned_execution_order
if app.cell_manager.cell_data_at(cid).cell is not None
and not self.app.graph.is_disabled(cid)
)
self._executor = resolve_executor()
]

def _cancel(self, cell_id: CellId_t) -> None:
cancelled = {
cid
for cid in dataflow.transitive_closure(self.app.graph, {cell_id})
if cid in self.cells_to_run
}
for cid in cancelled:
self.app.graph.cells[cid].set_run_result_status("cancelled")
self.cells_cancelled |= cancelled
self._scheduler = SequentialScheduler(cells_to_run, self.app.graph)
self._evaluator = Evaluator(executor=resolve_executor(), lifecycles=[])

# _run_synchronous and _run_asynchronous are deliberate near-twins:
# the only difference is the await on the cell step. Keeping them
# as separate methods (rather than wrapping with asyncio.run
# unconditionally) preserves the no-event-loop guarantee for purely
# synchronous apps.
def _run_synchronous(
self,
post_execute_hooks: list[Callable[[], Any]],
Expand All @@ -93,39 +93,20 @@ def _run_synchronous(
glbls.update(self._glbls)

outputs: dict[CellId_t, Any] = {}
while self.cells_to_run:
cid = self.cells_to_run.popleft()
if cid in self.cells_cancelled:
while self._scheduler.pending():
cid = self._scheduler.pop_cell()
if self._scheduler.cancelled(cid):
continue
# Set up has already run in this case.
# Setup has already run by this point.
if cid == CellId_t(SETUP_CELL_NAME):
for hook in post_execute_hooks:
hook()
continue

cell = self.app.graph.cells[cid]
with get_context().with_cell_id(cid):
try:
output = self._executor.execute_cell(cell, glbls)
outputs[cid] = output
except MarimoRuntimeException as e:
unwrapped_exception = unwrap_user_exception(
e, self.app.graph
)

if isinstance(unwrapped_exception, MarimoStopError):
self._cancel(cid)
elif isinstance(
unwrapped_exception, MarimoMissingRefError
):
name_err = unwrapped_exception.name_error
raise (
name_err
if name_err is not None
else unwrapped_exception
) from None
else:
raise
result = self._evaluator.evaluate_sync(cell, glbls)
self._handle_run_result(cid, result, outputs)
finally:
for hook in post_execute_hooks:
hook()
Expand All @@ -147,47 +128,63 @@ async def _run_asynchronous(
glbls.update(self._glbls)

outputs: dict[CellId_t, Any] = {}

while self.cells_to_run:
cid = self.cells_to_run.popleft()
if cid in self.cells_cancelled:
while self._scheduler.pending():
cid = self._scheduler.pop_cell()
if self._scheduler.cancelled(cid):
continue

# Setup has already run by this point.
if cid == CellId_t(SETUP_CELL_NAME):
for hook in post_execute_hooks:
hook()
continue

cell = self.app.graph.cells[cid]
with get_context().with_cell_id(cid):
try:
output = await self._executor.execute_cell_async(
cell, glbls
)
outputs[cid] = output
except MarimoRuntimeException as e:
unwrapped_exception = unwrap_user_exception(
e, self.app.graph
)

if isinstance(unwrapped_exception, MarimoStopError):
self._cancel(cid)
elif isinstance(
unwrapped_exception, MarimoMissingRefError
):
name_err = unwrapped_exception.name_error
raise (
name_err
if name_err is not None
else unwrapped_exception
) from None
else:
raise
result = await self._evaluator.evaluate(cell, glbls)
self._handle_run_result(cid, result, outputs)
finally:
for hook in post_execute_hooks:
hook()
return outputs, glbls

def _handle_run_result(
self,
cid: CellId_t,
result: RunResult,
outputs: dict[CellId_t, Any],
) -> None:
"""Classify the Evaluator's RunResult; record output, cancel, or raise.

Aligns MarimoStopError handling with the kernel classifier: the
stop's output is recorded for the cell and descendants are
cancelled, instead of silently swallowing both.
"""
exc = result.exception
if exc is None:
outputs[cid] = result.output
return
if not isinstance(exc, BaseException):
# An Error-shape payload (e.g. ``MarimoStrictExecutionError``)
# from a lifecycle ``Skip(result=...)``. Script mode runs with
# no lifecycles today, so this is unreachable in practice;
# treat defensively by recording the output and cancelling
# descendants.
outputs[cid] = result.output
self._scheduler.cancel(cid)
return
if isinstance(exc, MarimoRuntimeException):
unwrapped = unwrap_user_exception(exc, self.app.graph)
if isinstance(unwrapped, MarimoStopError):
outputs[cid] = unwrapped.output
self._scheduler.cancel(cid)
return
if isinstance(unwrapped, MarimoMissingRefError):
name_err = unwrapped.name_error
raise (
name_err if name_err is not None else unwrapped
) from None
raise exc

def run(self) -> RunOutput:
from marimo._runtime.context.script_context import (
initialize_script_context,
Expand Down Expand Up @@ -231,7 +228,7 @@ def run(self) -> RunOutput:
theme=get_context().marimo_config["display"]["theme"]
)

post_execute_hooks = []
post_execute_hooks: list[Callable[[], Any]] = []
if DependencyManager.matplotlib.has():
from marimo._output.mpl import close_figures

Expand All @@ -249,25 +246,9 @@ def run(self) -> RunOutput:
)
return outputs, defs

# Cell runner manages the exception handling for kernel
# runner, but script runner should raise the wrapped
# exception if invoked directly.
# Raise the wrapped user exception from "None" so the stack
# trace points at the failing cell, not the runner.
except MarimoRuntimeException as e:
# MarimoMissingRefError, wraps the under lying NameError
# for context, so we raise the NameError directly.
if isinstance(e.__cause__, MarimoMissingRefError):
# For type checking + sanity check
if not isinstance(e.__cause__.name_error, NameError):
raise MarimoRuntimeException(
"Unexpected error occurred while running the app. "
"Improperly wrapped MarimoMissingRefError exception. "
"Please report this issue to "
"https://github.com/marimo-team/marimo/issues"
) from e.__cause__
raise e.__cause__.name_error from e.__cause__
# For all other exceptions, we raise the wrapped exception
# from "None" to indicate this is an Error propagation, and to not
# muddy the stacktrace from the failing cells themselves.
raise e.__cause__ from None # type: ignore
finally:
if installed_script_context:
Expand Down
2 changes: 0 additions & 2 deletions marimo/_runtime/dataflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from marimo import _loggers
from marimo._ast.cell import CellImpl
from marimo._runtime.dataflow.graph import DirectedGraph
from marimo._runtime.dataflow.runner import Runner
from marimo._runtime.dataflow.topology import GraphTopology
from marimo._runtime.dataflow.types import Edge, EdgeWithVar
from marimo._types.ids import CellId_t
Expand Down Expand Up @@ -253,7 +252,6 @@ def import_block_relatives(cid: CellId_t, children: bool) -> set[CellId_t]:
"DirectedGraph",
"Edge",
"EdgeWithVar",
"Runner",
"get_cycles",
"get_import_block_relatives",
"induced_subgraph",
Expand Down
Loading
Loading