Workflows

The underworld3.workflows package provides infrastructure for organising simulations as DAGs of cached, idempotent computations. For the conceptual overview see the workflow concepts guide; the convection example walks through one workflow end to end.

Workflow infrastructure for domain-specific simulation packages.

Provides a WorkflowConfig base class and small utilities so that external packages (e.g. uw3-hydrogen, uw3-groundwater) can define validated, serializable parameter sets on top of Underworld3.

See docs/developer/guides/workflow-packages.md for the full pattern.

The package’s public API version is exposed as underworld3.workflows.__api_version__. Every manifest written by Run.write_manifest or WorkflowProducts.save carries this stamp under the key workflow_api so older artefacts can be identified.

Configuration

WorkflowConfig

The Pydantic base class every workflow’s config inherits from. Subclasses declare _identity_fields to mark which fields invalidate cached products on change.

class underworld3.workflows.WorkflowConfig[source]

Bases: BaseModel

Base for domain-specific workflow configurations.

Subclass this in your workflow package to define validated, serializable parameter sets. The base class provides:

  • Standard metadata fields (name, description, output directory).

  • Optional reference-quantity strings that setup_model parses into uw.quantity objects and registers on the global Model.

  • YAML round-trip serialisation.

Example

>>> from underworld3.workflows import WorkflowConfig
>>> class MyConfig(WorkflowConfig):
...     depth_km: float = 100.0
...     viscosity: float = 1e21
...
>>> cfg = MyConfig(workflow_name="demo", ref_length="100 km")
>>> cfg.save_yaml("params.yaml")
>>> cfg2 = MyConfig.from_yaml("params.yaml")
model_config: ClassVar[ConfigDict] = {'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

workflow_name: str
description: str
output_dir: str
ref_length: str | None
ref_viscosity: str | None
ref_diffusivity: str | None
ref_temperature: str | None
ref_density: str | None
ref_velocity: str | None
cache_key(requires=None)[source]

Cache key derived from this config’s identity fields.

Returns None if the subclass has not declared _identity_fields — signalling to the workflow runner that input-hash freshness tracking is not enabled and it should fall back to existence-based caching (legacy behaviour from before this base method existed).

Parameters:

requires (mapping of str → str, optional) – Cache keys of upstream products this product depends on, keyed by product name. Folded into the digest so an upstream invalidation propagates downstream.

Return type:

str | None

view()[source]

Display configuration as a formatted table.

In Jupyter this renders as an HTML table via IPython.display. In a terminal it falls back to a plain-text table.

setup_model(name=None)[source]

Create (or reset) a uw.Model with reference quantities from this config.

Parameters:

name (str, optional) – Model name. Defaults to self.workflow_name.

Return type:

uw.Model

save_yaml(path)[source]

Write configuration to a YAML file.

Parameters:

path (str or Path) – Destination file path.

Return type:

None

classmethod from_yaml(path)[source]

Load configuration from a YAML file.

Parameters:

path (str or Path) – Source file path.

Returns:

A new instance of this class (or subclass).

Return type:

WorkflowConfig

config_cache_key / config_snapshot

Helpers used by WorkflowConfig.cache_key() and the runner. Most code shouldn’t call these directly, but they’re exported for custom workflows that need fine-grained control.

underworld3.workflows.config_cache_key(config, identity_fields, requires=None)[source]

Return a cache key derived from config’s identity fields.

Parameters:
  • config (Pydantic BaseModel-like) – The configuration object. Each name in identity_fields is read off it via getattr.

  • identity_fields (iterable of str) – The names of fields whose values determine the cache key. Other config fields (e.g. operational tolerances) are ignored.

  • requires (mapping str -> str, optional) – Cache keys of upstream products this one depends on, keyed by product name. Folded into the digest so an upstream invalidation propagates downstream.

Returns:

digest – Hex digest of length _DIGEST_LEN.

Return type:

str

Examples

>>> # Convection's identity hash
>>> config_cache_key(config, ["rayleigh", "cellsize", "T_degree"])
'32f71431391477a7'
>>> # A downstream aggregation depending on multiple upstream products
>>> config_cache_key(
...     sweep_config,
...     ["rayleigh_values", "aspect_ratios"],
...     requires={"run_summary_Ra1e4": "abc...", "run_summary_Ra1e5": "def..."},
... )
underworld3.workflows.config_snapshot(config, identity_fields)[source]

Return {field: getattr(config, field)} for the listed fields.

Mirror of config_cache_key()’s input handling — the snapshot is what the cache key was derived from, persisted alongside the digest so a future reader can audit what changed.

Parameters:

identity_fields (Iterable[str])

Return type:

dict

Step decorator

workflow_step

Mark a function as a workflow step with declared produces / requires lists. The runner walks these to resolve the DAG.

underworld3.workflows.workflow_step(fn=None, *, description=None, produces=None, requires=None)[source]

Mark a function as a workflow helper step.

Attaches a .view() method that displays the function source (syntax-highlighted in Jupyter). The decorator is transparent — the wrapped function behaves identically to the original.

Can be used with or without arguments:

@workflow_step
def create_mesh(config): ...

@workflow_step(description="Build the simulation mesh")
def create_mesh(config): ...

@workflow_step(
    description="Adapt mesh near fault surfaces",
    produces=["adapted_mesh"],
    requires=["mesh", "fault_surfaces"],
)
def adapt_mesh(mesh, faults, config): ...

The description is stored as fn.workflow_description and the produces/requires lists document the DAG of product dependencies (used by view() and WorkflowProducts).

Parameters:
  • fn (callable, optional) – The function to decorate (when used without parentheses).

  • description (str, optional) – Short human-readable description of this step.

  • produces (list of str, optional) – Product names this step creates.

  • requires (list of str, optional) – Product names this step depends on.

Run-directory primitives

Run

Thin wrapper around an on-disk run directory: manifest, h5 chain, timeseries CSV, summary. Used by time-loop workflows where the run directory is itself a workflow product.

class underworld3.workflows.Run[source]

Bases: object

A run-output directory — thin wrapper, no IO on construction.

Construction does not touch disk; Run.create() is the helper that makes the directory. Reads (manifest, steps, timeseries, summary) all hit disk fresh on each access — no caching.

__init__(path)[source]
classmethod open(path)[source]

Open an existing run directory.

Currently a thin alias for Run(path) — no validation that the path exists or contains a manifest. workflow_api / config_hash validation is deferred to __api_version__ = '1.0', when the contract is firmed up by a second consumer.

Return type:

Run

classmethod create(path)[source]

Create the directory (parents + exist_ok) and return a Run.

Return type:

Run

property manifest: Manifest | None

The manifest, or None if manifest.yaml doesn’t exist.

write_manifest(data)[source]

Write manifest.yaml with the given dict.

Auto-injects two fields if the caller hasn’t supplied them:

  • workflow_api — the current package __api_version__.

  • cache_key — copied from config_hash for time-loop run-directories where the run’s identity is the only input. Workflows producing run-directories that depend on upstream products should set cache_key (and inputs) on the dict explicitly.

Manifests written before either stamp existed read back with manifest.workflow_api is None / manifest.cache_key is None.

Parameters:

data (dict)

Return type:

None

property steps: list[int]

Indices of saved timesteps, derived from the xdmf files.

load_field(var, step, *, data_name=None)[source]

Read the saved field for var at step into var in place.

Thin wrapper around var.read_timestep that fills in this run’s directory and the workflow’s filename stem. The h5 layer does kd-tree interpolation when var’s mesh / FE space differs from what was written, which is what makes the warm-start recipe work across degrees and meshes.

data_name defaults to var’s constructor name (e.g. "T").

Parameters:
  • step (int)

  • data_name (str | None)

Return type:

None

property timeseries_path: Path
property timeseries: list[dict]

Read timeseries.csv as a list of typed-row dicts.

step is parsed as int; t and dt as float; every other column is parsed via _csv_to_float() (so ‘—’ / empty cells become NaN). Missing file → [].

Columns absent from the file are absent from the row dicts — callers that need legacy compatibility should fill in defaults themselves.

append_step(step, t, dt, mesh, mesh_vars, diags, fields)[source]

Write an h5 checkpoint at step and append a timeseries row.

Combines the two operations a time-loop workflow does on every save_every tick — mesh.write_timestep plus append_timeseries_row() — into one call so the caller’s loop body shrinks from four lines to one.

Parameters:
  • step (int) – Step index (used as the h5 filename suffix and the "step" column of the timeseries row).

  • t (float) – Simulated time and step size; written into the "t" and "dt" columns.

  • dt (float) – Simulated time and step size; written into the "t" and "dt" columns.

  • mesh (underworld3 Mesh) – Mesh whose write_timestep will be called.

  • mesh_vars (list of MeshVariable) – The variables to checkpoint. Passed to mesh.write_timestep(meshVars=...).

  • diags (dict) – Diagnostic columns to write alongside step/t/dt. Keys should match the workflow’s timeseries schema.

  • fields (tuple of str) – The timeseries CSV column order — same value passed to append_timeseries_row().

Return type:

None

append_timeseries_row(row, fields)[source]

Append one row to timeseries.csv, with schema migration.

If the file exists with a header that doesn’t match fields, the file is rewritten in place under the new schema (with missing columns serialising as '---'). Otherwise the row is appended, with the header auto-added on first write.

None and non-finite floats in row are written as '---' (visually distinct from numeric zero).

Parameters:
Return type:

None

property summary_path: Path
property summary: dict | None

Contents of run_summary.yaml, or None if absent.

write_summary(summary)[source]
Parameters:

summary (dict)

Return type:

None

archive()[source]

Rename the directory to <name>.archive-<UTC stamp>/.

Never deletes. Returns the new archive path, or None if the directory doesn’t exist.

Return type:

Path | None

Manifest

Read-only view onto a run directory’s manifest.yaml with convenience properties for workflow, config_hash, config_snapshot, started_at, workflow_api, cache_key, and inputs.

class underworld3.workflows.Manifest[source]

Bases: object

Wrapper around manifest.yaml contents.

The wrapped data dict is whatever the workflow chose to write — no schema is enforced at this level (the package is at __api_version__ = '0.1'; richer validation comes when a second consumer tightens the contract). Convenience properties expose the keys the convection workflow uses today plus the package-level workflow_api stamp injected by Run.write_manifest().

data: dict
classmethod read(run_dir)[source]

Load manifest.yaml from run_dir or return None.

Return type:

Manifest | None

write(run_dir)[source]

Write manifest.yaml into run_dir.

Return type:

None

get(key, default=None)[source]
property workflow
property workflow_api

Package __api_version__ recorded at write time, or None for manifests written before the package added the stamp.

property config_hash
property config_snapshot: dict
property cache_key: str | None

Cache-key digest for the product this manifest describes.

Pre-0.2 manifests don’t carry this; None then means “treat as always-fresh” (the legacy behaviour).

property inputs: dict

Inputs the cache_key was derived from (config + upstream product cache keys), or {} for pre-0.2 manifests.

property started_at
__init__(data=<factory>)
Parameters:

data (dict)

Return type:

None

RUN_NAME

The filename stem used by Run for its h5/xdmf chain (<RUN_NAME>.mesh.NNNNN.h5). Defaults to "run".

underworld3.workflows.RUN_NAME = 'run'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

Runner

WorkflowRunner

Resolves the DAG of @workflow_step-decorated functions in a workflow module, building products on demand and caching them in memory and on disk via WorkflowProducts.

Key methods:

  • build(name) / get(name) — return product name, building if needed. Synonym pair.

  • build_all() — build every leaf product.

  • rebuild(name) — invalidate and rebuild.

  • invalidate(name) — drop a product from cache + disk.

  • status(name)"cached", "on_disk", "missing".

  • dag() — display steps with status (HTML in Jupyter, plain text in a terminal).

  • diagram() — Graphviz DOT source for the runner’s DAG with per-product status colours.

  • observe(callback) — register a hook fired on cache/load/build events.

  • what_invalidates(name) — set of products that would rebuild if name changed.

class underworld3.workflows.WorkflowRunner[source]

Bases: object

Resolve and run workflow steps in dependency order.

Parameters:
  • module (module) – A workflow module (e.g. import convection_config as convection).

  • config (WorkflowConfig) – Configuration object passed to every step whose signature includes a config parameter.

  • products (WorkflowProducts, optional) – Persistence layer. If omitted, products live in memory only.

Examples

>>> runner = WorkflowRunner(convection, config, products)
>>> runner.build("evolution_log")     # resolves all dependencies
>>> runner.dag()                       # show steps + status
>>> runner.rebuild("temperature_initial")  # invalidate + rebuild
__init__(module, config, products=None)[source]
get(name)[source]

Return product name, building (and caching) if needed.

When the config supports cache_key-based freshness tracking, a disk-cached product is only returned if its recorded cache key matches what the current config expects; mismatch triggers a rebuild. Configs without _identity_fields declared fall back to existence-based caching (legacy behaviour).

Parameters:

name (str)

build(name)

Return product name, building (and caching) if needed.

When the config supports cache_key-based freshness tracking, a disk-cached product is only returned if its recorded cache key matches what the current config expects; mismatch triggers a rebuild. Configs without _identity_fields declared fall back to existence-based caching (legacy behaviour).

Parameters:

name (str)

build_all()[source]

Build every leaf product (one nothing else requires).

Returns the list of leaf product names that were built.

invalidate(name)[source]

Drop name from cache and remove its on-disk product if any.

Does not invalidate downstream products. Caller is responsible for rebuilding anything that depended on this.

Parameters:

name (str)

rebuild(name)[source]

Invalidate and rebuild a single product.

Parameters:

name (str)

status(name)[source]

Return one of 'cached', 'on_disk', 'missing'.

Parameters:

name (str)

Return type:

str

observe(callback)[source]

Register a callback fired on product cache/load/build events.

Each registered callback is invoked as callback(event, name) where event is one of:

  • "cached" — returned from in-memory cache

  • "loaded" — read from on-disk products

  • "building" — about to run the producing step

  • "built" — step finished, product cached and (if persistable) saved

Callbacks that raise are silently ignored so a buggy observer can’t break the runner.

Used by interactive UI layers (Jupyter widgets, panel dashboards) that want to display DAG progress without polling.

Return type:

None

diagram(*, rankdir='LR')[source]

Generate a Graphviz DOT diagram of this runner’s DAG.

Nodes are colour-coded by current status (cached / on-disk / missing) using status() as the status provider.

Returns the DOT source string; render with the dot command:

Path("dag.dot").write_text(runner.diagram())
# then in a shell: dot -Tpng dag.dot -o dag.png

Or use underworld3.workflows.render() for a one-call wrapper that invokes dot directly.

Parameters:

rankdir (str)

Return type:

str

what_invalidates(name)[source]

Set of products that would rebuild if name changed.

Walks the produces/requires DAG forward from name and returns every product (transitively) that lists name in its requires chain. Useful for UI layers showing “if you change X, these will rebuild”.

name itself is not included in the returned set.

Parameters:

name (str)

Return type:

set

dag()[source]

Display all steps with their produces / requires / status.

In Jupyter this renders as an HTML table; falls back to plain text in a terminal.

Products

WorkflowProducts

The on-disk persistence layer. Type-aware save/load (Mesh → HDF5, Run → directory pointer, Path → file pointer, ndarray → NPZ, …). Maintains a YAML manifest with per-product cache keys and input audit.

class underworld3.workflows.WorkflowProducts[source]

Bases: object

Manages workflow product persistence (make-like).

Products are named objects (meshes, variables, surfaces) saved to a products directory. Each product has a type-aware save/load method. A YAML manifest tracks what exists.

Parameters:
  • config (WorkflowConfig, optional) – If provided, products_dir defaults to config.output_dir / "products".

  • products_dir (str or Path, optional) – Explicit products directory. Overrides config.

__init__(config=None, products_dir=None)[source]
property products_dir
save(name, obj, *, cache_key=None, inputs=None, **metadata)[source]

Save a product to disk.

Type dispatch:

  • Mesh — HDF5 via write_timestep() (vertex-based, portable)

  • MeshVariable — HDF5 via write_timestep() with the mesh

  • Surface — VTK via .save()

  • SurfaceCollection — directory of VTK files

  • dict of Surfaces — directory of VTK files

  • ndarray.npz

Parameters:
  • name (str) – Product name (used as filename stem and manifest key).

  • obj (object) – The object to save.

  • cache_key (str, optional) – Hex digest of this product’s inputs. Used by fresh() to decide whether a future caller’s expected inputs match the cached version. Pre-0.2 callers pass nothing here; the entry is then treated as “always fresh” on read (legacy behaviour).

  • inputs (dict, optional) – The inputs cache_key was derived from — typically {"config": {...}, "requires": {upstream_name: cache_key}}. Persisted alongside the digest so future readers can audit what changed.

  • **metadata – Extra key-value pairs stored in the manifest entry.

load(name, mesh=None, mesh_variable=None)[source]

Load a product by name.

Parameters:
  • name (str) – Product name (must exist in manifest).

  • mesh (Mesh, optional) – Required for loading Surfaces that need a mesh reference.

  • mesh_variable (MeshVariable, optional) – For mesh_variable products: the target variable to load data into (via read_timestep). Must already exist on a mesh.

Returns:

The loaded product.

Return type:

object

Raises:
exists(name)[source]

Check if a product exists in the manifest and on disk.

Parameters:

name (str)

Return type:

bool

cache_key_for(name)[source]

Return the cache key recorded for product name.

None if the product has never been saved, or if it was saved before cache-key tracking existed (pre-0.2 manifest). Callers that treat None as “fresh” preserve the legacy behaviour.

Parameters:

name (str)

Return type:

str | None

fresh(name, expected_cache_key)[source]

True iff name exists with the expected cache key.

Used by WorkflowRunner to decide whether a cached product is still valid for the current inputs. Mismatched cache key ⇒ rebuild; missing entry ⇒ rebuild; pre-0.2 manifest entry (no cache_key recorded) ⇒ treated as fresh (legacy behaviour, not stricter than today).

Parameters:
  • name (str)

  • expected_cache_key (str)

Return type:

bool

list()[source]

Display available products.

In Jupyter renders as an HTML table; falls back to plain text.

status(module=None)[source]

Show which products exist vs need building.

If module is provided (a workflow module with @workflow_step decorated functions), cross-references product names from step metadata against the manifest.

Parameters:

module (module, optional) – A workflow module to scan for produces/requires metadata.

remove(name)[source]

Remove a product from disk and the manifest.

Parameters:

name (str) – Product name to remove.

clear()[source]

Remove all products from disk and the manifest.

CLI helper

cli_from_config

Auto-derive an argparse parser from a WorkflowConfig subclass. Every Pydantic field becomes a CLI flag, with type-aware mapping:

Pydantic type

CLI form

bool

--flag / --no-flag (BooleanOptionalAction)

int, float, str

typed value

Literal[...]

choices= constraint

anything else

silently skipped

Most workflows use this to build their CLI driver in five lines:

from underworld3.workflows import cli_from_config, config_from_args
import my_workflow_config as mw

parser = cli_from_config(mw.MyConfig)
parser.add_argument("--no-evolve", action="store_true")
args = parser.parse_args()
config = config_from_args(mw.MyConfig, args)
underworld3.workflows.cli_from_config(ConfigClass, *, description=None, hidden_fields=frozenset({'description', 'workflow_name'}))[source]

Build an argparse parser auto-derived from ConfigClass’ fields.

Parameters:
  • ConfigClass (type) – A WorkflowConfig subclass (or any Pydantic BaseModel with model_fields).

  • description (str, optional) – Parser description. Defaults to the class’ docstring’s first line, if any.

  • hidden_fields (iterable of str) – Field names that should not be exposed on the CLI. Defaults to {"workflow_name", "description"} — the workflow-itself metadata. Callers can pass their own list if they want to suppress (or expose) different fields.

Returns:

parser – Pre-populated with one argument per recognised Pydantic field. Callers can add_argument further (e.g. action flags like --movies or --no-evolve) before parsing.

Return type:

argparse.ArgumentParser

underworld3.workflows.config_from_args(ConfigClass, args, *, hidden_fields=frozenset({'description', 'workflow_name'}))[source]

Construct a ConfigClass instance from parsed CLI args.

None values (i.e. fields the user didn’t override on the command line) drop through to the config’s default. Fields not present in args are silently ignored.

Parameters:

args (Namespace)

DAG diagrams

diagram

Generate a Graphviz DOT source string for a workflow module’s produces/requires graph. Pass to dot -Tpng (or -Tsvg / -Tpdf) to render.

underworld3.workflows.diagram(module, status_provider=None, *, title=None, rankdir='LR')[source]

Generate a Graphviz DOT source string for module’s DAG.

Parameters:
  • module (module) – A workflow module (e.g. import convection_config as cc; pass cc).

  • status_provider (callable, optional) – status_provider(name) → status string for product name. Recognised statuses (and their fill colours) are listed in _STATUS_COLOURS — anything else falls through to the default “missing” grey. Typically wired to WorkflowRunner.status().

  • title (str, optional) – Graph title. Defaults to module.__name__.

  • rankdir (str) – Graphviz rankdir"LR" (left→right, default) or "TB" (top→bottom).

Returns:

dot_source – Compile with dot -Tpng input.dot -o output.png (or -Tsvg) to render the diagram.

Return type:

str

render

One-call wrapper around diagram plus the dot binary — generates a rendered PNG/SVG/PDF directly. Requires Graphviz on PATH.

underworld3.workflows.render(module, output_path, *, format=None, status_provider=None, rankdir='LR')[source]

Render module’s DAG to an image file via the dot command.

Convenience wrapper around diagram() plus subprocess. Requires the dot binary on PATH (apt install graphviz, brew install graphviz, etc.).

Parameters:
  • module (module) – A workflow module.

  • output_path (str or Path) – Destination file. Format inferred from the extension (.png, .svg, .pdf) unless overridden by format.

  • format (str, optional) – Graphviz output format, e.g. "png", "svg", "pdf". Inferred from output_path extension if not given.

  • status_provider (Callable[[str], str] | None) – Forwarded to diagram().

  • rankdir (str) – Forwarded to diagram().

Returns:

output_path – Resolved path of the written image.

Return type:

Path

Raises:

FileNotFoundError – If the dot command is not on PATH.

Discovery

view

Display the workflow steps and config classes in a workflow module. Convenient inside a Jupyter notebook (renders an HTML table); falls back to plain text in a terminal.

underworld3.workflows.view(module)[source]

Display the workflow steps defined in module.

Scans module for functions decorated with @workflow_step and lists them with their descriptions. In Jupyter this renders as an HTML table; in a terminal as plain text.

Parameters:

module (module) – A workflow module (e.g. import convection_config as convection; uw.workflows.view(convection)).

list_workflows / init_workflow

Discover available workflows on the system; scaffold a new workflow into a target directory. See underworld3.workflows.scaffold for details.

underworld3.workflows.list_workflows(repo_root=None)[source]

Discover all available workflows (builtin + external).

See underworld3.workflows.scaffold for details.

underworld3.workflows.init_workflow(name, target_dir='.', repo_root=None, force=False)[source]

Scaffold a workflow into a target directory.

See underworld3.workflows.scaffold for details.

Utilities

check_dependencies

Check that optional packages a workflow needs are installed; emit a clear error with install instructions if not. Use at workflow module top so users don’t see a confusing ImportError deep in a solve.

underworld3.workflows.check_dependencies(packages)[source]

Check that optional packages are importable; raise with install hints.

Parameters:

packages (dict) – Mapping of import name to install instruction, e.g. {"geopandas": "pip install geopandas"}.

Raises:

ImportError – If any package cannot be imported.

Return type:

None

parse_quantity

Parse a quantity string ("50 km", "1e21 Pa*s") into a uw.quantity. Used internally by WorkflowConfig.setup_model.

underworld3.workflows.parse_quantity(s)[source]

Parse a string like "1000 km" into a uw.quantity.

Parameters:

s (str) – Value and unit separated by whitespace, e.g. "1e21 Pa*s".

Returns:

The parsed quantity.

Return type:

UWQuantity

show_source

Display the source of a workflow function. Useful for notebook-side introspection without leaving the cell.

underworld3.workflows.show_source(fn)[source]

Display the source code of fn with syntax highlighting.

In Jupyter the source is rendered as a Python code block via IPython.display.Markdown. In a terminal it falls back to plain print.

Works on any function or method, not just workflow steps.

Parameters:

fn (callable) – The function whose source you want to inspect.

Return type:

None