Source code for underworld3.workflows._diagram

"""Render a workflow module's produces/requires DAG as a flow chart.

Walks a workflow module's ``@workflow_step``-decorated functions and
emits a Graphviz DOT source string with one node per product and one
edge per ``requires`` link.  An optional *status_provider* callback
colours nodes by cache state ("cached", "loaded", "missing", ...).

Used both as a developer aid (``runner.diagram()`` for at-a-glance
DAG inspection) and as a documentation artefact (the DOT source can
be rendered to PNG/SVG via the ``dot`` command and embedded in
notebooks or papers).

The cetz/typst rendering path mentioned in the design memo would
emit a different source string from the same DAG walk; for now the
DOT path is the working primitive.

Example
-------

>>> from underworld3.workflows import diagram
>>> import convection_config as convection
>>> dot_src = diagram(convection)
>>> Path("convection_dag.dot").write_text(dot_src)
>>> # then in a shell: dot -Tpng convection_dag.dot -o convection_dag.png
"""

from __future__ import annotations

import inspect
import shutil
import subprocess
from pathlib import Path
from typing import Callable, Optional


# Colours used when *status_provider* labels a node.
_STATUS_COLOURS = {
    "cached": "#bce6bc",      # green — sitting in memory
    "loaded": "#bcd7e6",      # blue — sitting on disk
    "on_disk": "#bcd7e6",     # alias
    "building": "#f7e389",    # yellow — currently running
    "missing": "#e6e6e6",     # grey — not yet built
    "stale": "#e6bcbc",       # red — input changed, needs rebuild
}


def _collect_dag(module) -> tuple[dict, list]:
    """Walk *module* for ``@workflow_step`` functions.

    Returns ``(producers, edges)`` where *producers* maps product
    name → producing step and *edges* is a list of ``(upstream,
    downstream)`` tuples.
    """
    producers: dict[str, Callable] = {}
    edges: list[tuple[str, str]] = []
    for _name, obj in inspect.getmembers(module, callable):
        if not getattr(obj, "_is_workflow_step", False):
            continue
        produces = getattr(obj, "workflow_produces", None) or []
        requires = getattr(obj, "workflow_requires", None) or []
        for prod in produces:
            producers[prod] = obj
            for req in requires:
                edges.append((req, prod))
    return producers, edges


[docs] def diagram( module, status_provider: Optional[Callable[[str], str]] = None, *, title: Optional[str] = None, rankdir: str = "LR", ) -> str: """Generate a Graphviz DOT source string for *module*'s DAG. Parameters ---------- module : module A workflow module (e.g. ``import convection_config as cc``; pass ``cc``). status_provider : callable, optional ``status_provider(name)`` → status string for product *name*. Recognised statuses (and their fill colours) are listed in ``_STATUS_COLOURS`` — anything else falls through to the default "missing" grey. Typically wired to :meth:`WorkflowRunner.status`. title : str, optional Graph title. Defaults to ``module.__name__``. rankdir : str Graphviz ``rankdir`` — ``"LR"`` (left→right, default) or ``"TB"`` (top→bottom). Returns ------- dot_source : str Compile with ``dot -Tpng input.dot -o output.png`` (or ``-Tsvg``) to render the diagram. """ producers, edges = _collect_dag(module) if title is None: title = getattr(module, "__name__", "workflow") lines = [ f'digraph "{title}" {{', f' rankdir="{rankdir}";', ' node [shape=box, style="rounded,filled", fontname="monospace", fontsize=10];', ' edge [fontname="monospace", fontsize=8, color="#888888"];', f' label="{title}"; labelloc="t"; fontname="sans-serif"; fontsize=12;', ] # Nodes. for name in sorted(producers): status = status_provider(name) if status_provider else None colour = _STATUS_COLOURS.get(status, _STATUS_COLOURS["missing"]) step_name = producers[name].__name__ # Tooltip carries the producing step's name so the SVG hover # text shows what built this product. lines.append( f' "{name}" [fillcolor="{colour}", ' f'tooltip="produced by {step_name}()"];' ) # Add nodes for required-but-unproduced products (external inputs) # so dangling edges aren't silently dropped. referenced = {req for req, _ in edges} external = referenced - set(producers) for name in sorted(external): lines.append( f' "{name}" [fillcolor="#fff8d0", ' f'tooltip="external input (no producer in this module)", ' f'style="rounded,filled,dashed"];' ) # Edges. for upstream, downstream in edges: lines.append(f' "{upstream}" -> "{downstream}";') lines.append("}") return "\n".join(lines)
[docs] def render( module, output_path, *, format: Optional[str] = None, status_provider: Optional[Callable[[str], str]] = None, rankdir: str = "LR", ) -> Path: """Render *module*'s DAG to an image file via the ``dot`` command. Convenience wrapper around :func:`diagram` plus ``subprocess``. Requires the ``dot`` binary on ``PATH`` (``apt install graphviz``, ``brew install graphviz``, etc.). Parameters ---------- module : module A workflow module. output_path : str or Path Destination file. Format inferred from the extension (``.png``, ``.svg``, ``.pdf``) unless overridden by *format*. format : str, optional Graphviz output format, e.g. ``"png"``, ``"svg"``, ``"pdf"``. Inferred from *output_path* extension if not given. status_provider, rankdir Forwarded to :func:`diagram`. Returns ------- output_path : Path Resolved path of the written image. Raises ------ FileNotFoundError If the ``dot`` command is not on PATH. """ output_path = Path(output_path) if format is None: ext = output_path.suffix.lstrip(".").lower() format = ext or "png" dot_bin = shutil.which("dot") if dot_bin is None: raise FileNotFoundError( "Graphviz 'dot' binary not found on PATH. " "Install with `brew install graphviz` (macOS), " "`apt install graphviz` (Debian/Ubuntu), or equivalent." ) src = diagram( module, status_provider=status_provider, rankdir=rankdir, ) output_path.parent.mkdir(parents=True, exist_ok=True) subprocess.run( [dot_bin, f"-T{format}", "-o", str(output_path)], input=src.encode("utf-8"), check=True, ) return output_path
__all__ = ["diagram", "render"]