Workflows¶
The underworld3.workflows package provides infrastructure for
organising simulations as DAGs of cached, idempotent computations.
For the conceptual overview see the
workflow concepts guide;
the convection example
walks through one workflow end to end.
Workflow infrastructure for domain-specific simulation packages.
Provides a WorkflowConfig base class and small utilities so that
external packages (e.g. uw3-hydrogen, uw3-groundwater) can
define validated, serializable parameter sets on top of Underworld3.
See docs/developer/guides/workflow-packages.md for the full pattern.
The package’s public API version is exposed as
underworld3.workflows.__api_version__. Every manifest written
by Run.write_manifest or WorkflowProducts.save carries this
stamp under the key workflow_api so older artefacts can be
identified.
Configuration¶
WorkflowConfig¶
The Pydantic base class every workflow’s config inherits from.
Subclasses declare _identity_fields to mark which fields
invalidate cached products on change.
- class underworld3.workflows.WorkflowConfig[source]¶
Bases:
BaseModelBase for domain-specific workflow configurations.
Subclass this in your workflow package to define validated, serializable parameter sets. The base class provides:
Standard metadata fields (name, description, output directory).
Optional reference-quantity strings that
setup_modelparses intouw.quantityobjects and registers on the global Model.YAML round-trip serialisation.
Example
>>> from underworld3.workflows import WorkflowConfig >>> class MyConfig(WorkflowConfig): ... depth_km: float = 100.0 ... viscosity: float = 1e21 ... >>> cfg = MyConfig(workflow_name="demo", ref_length="100 km") >>> cfg.save_yaml("params.yaml") >>> cfg2 = MyConfig.from_yaml("params.yaml")
- model_config: ClassVar[ConfigDict] = {'extra': 'allow'}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- cache_key(requires=None)[source]¶
Cache key derived from this config’s identity fields.
Returns
Noneif the subclass has not declared_identity_fields— signalling to the workflow runner that input-hash freshness tracking is not enabled and it should fall back to existence-based caching (legacy behaviour from before this base method existed).- Parameters:
requires (mapping of str → str, optional) – Cache keys of upstream products this product depends on, keyed by product name. Folded into the digest so an upstream invalidation propagates downstream.
- Return type:
str | None
- view()[source]¶
Display configuration as a formatted table.
In Jupyter this renders as an HTML table via
IPython.display. In a terminal it falls back to a plain-text table.
- setup_model(name=None)[source]¶
Create (or reset) a
uw.Modelwith reference quantities from this config.- Parameters:
name (str, optional) – Model name. Defaults to
self.workflow_name.- Return type:
uw.Model
- save_yaml(path)[source]¶
Write configuration to a YAML file.
- Parameters:
path (str or Path) – Destination file path.
- Return type:
None
config_cache_key / config_snapshot¶
Helpers used by WorkflowConfig.cache_key() and the runner.
Most code shouldn’t call these directly, but they’re exported for
custom workflows that need fine-grained control.
- underworld3.workflows.config_cache_key(config, identity_fields, requires=None)[source]¶
Return a cache key derived from config’s identity fields.
- Parameters:
config (Pydantic BaseModel-like) – The configuration object. Each name in identity_fields is read off it via
getattr.identity_fields (iterable of str) – The names of fields whose values determine the cache key. Other config fields (e.g. operational tolerances) are ignored.
requires (mapping str -> str, optional) – Cache keys of upstream products this one depends on, keyed by product name. Folded into the digest so an upstream invalidation propagates downstream.
- Returns:
digest – Hex digest of length
_DIGEST_LEN.- Return type:
Examples
>>> # Convection's identity hash >>> config_cache_key(config, ["rayleigh", "cellsize", "T_degree"]) '32f71431391477a7'
>>> # A downstream aggregation depending on multiple upstream products >>> config_cache_key( ... sweep_config, ... ["rayleigh_values", "aspect_ratios"], ... requires={"run_summary_Ra1e4": "abc...", "run_summary_Ra1e5": "def..."}, ... )
- underworld3.workflows.config_snapshot(config, identity_fields)[source]¶
Return
{field: getattr(config, field)}for the listed fields.Mirror of
config_cache_key()’s input handling — the snapshot is what the cache key was derived from, persisted alongside the digest so a future reader can audit what changed.
Step decorator¶
workflow_step¶
Mark a function as a workflow step with declared produces /
requires lists. The runner walks these to resolve the DAG.
- underworld3.workflows.workflow_step(fn=None, *, description=None, produces=None, requires=None)[source]¶
Mark a function as a workflow helper step.
Attaches a
.view()method that displays the function source (syntax-highlighted in Jupyter). The decorator is transparent — the wrapped function behaves identically to the original.Can be used with or without arguments:
@workflow_step def create_mesh(config): ... @workflow_step(description="Build the simulation mesh") def create_mesh(config): ... @workflow_step( description="Adapt mesh near fault surfaces", produces=["adapted_mesh"], requires=["mesh", "fault_surfaces"], ) def adapt_mesh(mesh, faults, config): ...
The description is stored as
fn.workflow_descriptionand the produces/requires lists document the DAG of product dependencies (used byview()andWorkflowProducts).- Parameters:
Run-directory primitives¶
Run¶
Thin wrapper around an on-disk run directory: manifest, h5 chain, timeseries CSV, summary. Used by time-loop workflows where the run directory is itself a workflow product.
- class underworld3.workflows.Run[source]¶
Bases:
objectA run-output directory — thin wrapper, no IO on construction.
Construction does not touch disk;
Run.create()is the helper that makes the directory. Reads (manifest,steps,timeseries,summary) all hit disk fresh on each access — no caching.- classmethod open(path)[source]¶
Open an existing run directory.
Currently a thin alias for
Run(path)— no validation that the path exists or contains a manifest.workflow_api/config_hashvalidation is deferred to__api_version__ = '1.0', when the contract is firmed up by a second consumer.- Return type:
- classmethod create(path)[source]¶
Create the directory (parents + exist_ok) and return a Run.
- Return type:
- write_manifest(data)[source]¶
Write
manifest.yamlwith the given dict.Auto-injects two fields if the caller hasn’t supplied them:
workflow_api— the current package__api_version__.cache_key— copied fromconfig_hashfor time-loop run-directories where the run’s identity is the only input. Workflows producing run-directories that depend on upstream products should setcache_key(andinputs) on the dict explicitly.
Manifests written before either stamp existed read back with
manifest.workflow_api is None/manifest.cache_key is None.- Parameters:
data (dict)
- Return type:
None
- load_field(var, step, *, data_name=None)[source]¶
Read the saved field for var at step into var in place.
Thin wrapper around
var.read_timestepthat fills in this run’s directory and the workflow’s filename stem. The h5 layer does kd-tree interpolation when var’s mesh / FE space differs from what was written, which is what makes the warm-start recipe work across degrees and meshes.data_namedefaults to var’s constructor name (e.g."T").
- property timeseries: list[dict]¶
Read
timeseries.csvas a list of typed-row dicts.stepis parsed as int;tanddtas float; every other column is parsed via_csv_to_float()(so ‘—’ / empty cells become NaN). Missing file →[].Columns absent from the file are absent from the row dicts — callers that need legacy compatibility should fill in defaults themselves.
- append_step(step, t, dt, mesh, mesh_vars, diags, fields)[source]¶
Write an h5 checkpoint at step and append a timeseries row.
Combines the two operations a time-loop workflow does on every save_every tick —
mesh.write_timestepplusappend_timeseries_row()— into one call so the caller’s loop body shrinks from four lines to one.- Parameters:
step (int) – Step index (used as the h5 filename suffix and the
"step"column of the timeseries row).t (float) – Simulated time and step size; written into the
"t"and"dt"columns.dt (float) – Simulated time and step size; written into the
"t"and"dt"columns.mesh (underworld3 Mesh) – Mesh whose
write_timestepwill be called.mesh_vars (list of MeshVariable) – The variables to checkpoint. Passed to
mesh.write_timestep(meshVars=...).diags (dict) – Diagnostic columns to write alongside step/t/dt. Keys should match the workflow’s timeseries schema.
fields (tuple of str) – The timeseries CSV column order — same value passed to
append_timeseries_row().
- Return type:
None
- append_timeseries_row(row, fields)[source]¶
Append one row to
timeseries.csv, with schema migration.If the file exists with a header that doesn’t match fields, the file is rewritten in place under the new schema (with missing columns serialising as
'---'). Otherwise the row is appended, with the header auto-added on first write.Noneand non-finite floats in row are written as'---'(visually distinct from numeric zero).
Manifest¶
Read-only view onto a run directory’s manifest.yaml with
convenience properties for workflow, config_hash,
config_snapshot, started_at, workflow_api, cache_key,
and inputs.
- class underworld3.workflows.Manifest[source]¶
Bases:
objectWrapper around
manifest.yamlcontents.The wrapped
datadict is whatever the workflow chose to write — no schema is enforced at this level (the package is at__api_version__ = '0.1'; richer validation comes when a second consumer tightens the contract). Convenience properties expose the keys the convection workflow uses today plus the package-levelworkflow_apistamp injected byRun.write_manifest().- classmethod read(run_dir)[source]¶
Load
manifest.yamlfrom run_dir or returnNone.- Return type:
Manifest | None
- property workflow¶
- property workflow_api¶
Package
__api_version__recorded at write time, orNonefor manifests written before the package added the stamp.
- property config_hash¶
- property cache_key: str | None¶
Cache-key digest for the product this manifest describes.
Pre-0.2 manifests don’t carry this;
Nonethen means “treat as always-fresh” (the legacy behaviour).
- property inputs: dict¶
Inputs the cache_key was derived from (config + upstream product cache keys), or
{}for pre-0.2 manifests.
- property started_at¶
RUN_NAME¶
The filename stem used by Run for its h5/xdmf chain
(<RUN_NAME>.mesh.NNNNN.h5). Defaults to "run".
- underworld3.workflows.RUN_NAME = 'run'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
Runner¶
WorkflowRunner¶
Resolves the DAG of @workflow_step-decorated functions in a
workflow module, building products on demand and caching them in
memory and on disk via WorkflowProducts.
Key methods:
build(name)/get(name)— return product name, building if needed. Synonym pair.build_all()— build every leaf product.rebuild(name)— invalidate and rebuild.invalidate(name)— drop a product from cache + disk.status(name)—"cached","on_disk","missing".dag()— display steps with status (HTML in Jupyter, plain text in a terminal).diagram()— Graphviz DOT source for the runner’s DAG with per-product status colours.observe(callback)— register a hook fired on cache/load/build events.what_invalidates(name)— set of products that would rebuild if name changed.
- class underworld3.workflows.WorkflowRunner[source]¶
Bases:
objectResolve and run workflow steps in dependency order.
- Parameters:
module (module) – A workflow module (e.g.
import convection_config as convection).config (WorkflowConfig) – Configuration object passed to every step whose signature includes a
configparameter.products (WorkflowProducts, optional) – Persistence layer. If omitted, products live in memory only.
Examples
>>> runner = WorkflowRunner(convection, config, products) >>> runner.build("evolution_log") # resolves all dependencies >>> runner.dag() # show steps + status >>> runner.rebuild("temperature_initial") # invalidate + rebuild
- get(name)[source]¶
Return product name, building (and caching) if needed.
When the config supports
cache_key-based freshness tracking, a disk-cached product is only returned if its recorded cache key matches what the current config expects; mismatch triggers a rebuild. Configs without_identity_fieldsdeclared fall back to existence-based caching (legacy behaviour).- Parameters:
name (str)
- build(name)¶
Return product name, building (and caching) if needed.
When the config supports
cache_key-based freshness tracking, a disk-cached product is only returned if its recorded cache key matches what the current config expects; mismatch triggers a rebuild. Configs without_identity_fieldsdeclared fall back to existence-based caching (legacy behaviour).- Parameters:
name (str)
- build_all()[source]¶
Build every leaf product (one nothing else requires).
Returns the list of leaf product names that were built.
- invalidate(name)[source]¶
Drop name from cache and remove its on-disk product if any.
Does not invalidate downstream products. Caller is responsible for rebuilding anything that depended on this.
- Parameters:
name (str)
- observe(callback)[source]¶
Register a callback fired on product cache/load/build events.
Each registered callback is invoked as
callback(event, name)where event is one of:"cached"— returned from in-memory cache"loaded"— read from on-disk products"building"— about to run the producing step"built"— step finished, product cached and (if persistable) saved
Callbacks that raise are silently ignored so a buggy observer can’t break the runner.
Used by interactive UI layers (Jupyter widgets, panel dashboards) that want to display DAG progress without polling.
- Return type:
None
- diagram(*, rankdir='LR')[source]¶
Generate a Graphviz DOT diagram of this runner’s DAG.
Nodes are colour-coded by current status (cached / on-disk / missing) using
status()as the status provider.Returns the DOT source string; render with the
dotcommand:Path("dag.dot").write_text(runner.diagram()) # then in a shell: dot -Tpng dag.dot -o dag.png
Or use
underworld3.workflows.render()for a one-call wrapper that invokesdotdirectly.
- what_invalidates(name)[source]¶
Set of products that would rebuild if name changed.
Walks the produces/requires DAG forward from name and returns every product (transitively) that lists name in its requires chain. Useful for UI layers showing “if you change X, these will rebuild”.
nameitself is not included in the returned set.
Products¶
WorkflowProducts¶
The on-disk persistence layer. Type-aware save/load (Mesh →
HDF5, Run → directory pointer, Path → file pointer, ndarray →
NPZ, …). Maintains a YAML manifest with per-product cache keys
and input audit.
- class underworld3.workflows.WorkflowProducts[source]¶
Bases:
objectManages workflow product persistence (make-like).
Products are named objects (meshes, variables, surfaces) saved to a products directory. Each product has a type-aware save/load method. A YAML manifest tracks what exists.
- Parameters:
config (WorkflowConfig, optional) – If provided,
products_dirdefaults toconfig.output_dir / "products".products_dir (str or Path, optional) – Explicit products directory. Overrides config.
- property products_dir¶
- save(name, obj, *, cache_key=None, inputs=None, **metadata)[source]¶
Save a product to disk.
Type dispatch:
Mesh — HDF5 via
write_timestep()(vertex-based, portable)MeshVariable — HDF5 via
write_timestep()with the meshSurface — VTK via
.save()SurfaceCollection — directory of VTK files
dict of Surfaces — directory of VTK files
ndarray —
.npz
- Parameters:
name (str) – Product name (used as filename stem and manifest key).
obj (object) – The object to save.
cache_key (str, optional) – Hex digest of this product’s inputs. Used by
fresh()to decide whether a future caller’s expected inputs match the cached version. Pre-0.2 callers pass nothing here; the entry is then treated as “always fresh” on read (legacy behaviour).inputs (dict, optional) – The inputs cache_key was derived from — typically
{"config": {...}, "requires": {upstream_name: cache_key}}. Persisted alongside the digest so future readers can audit what changed.**metadata – Extra key-value pairs stored in the manifest entry.
- load(name, mesh=None, mesh_variable=None)[source]¶
Load a product by name.
- Parameters:
- Returns:
The loaded product.
- Return type:
- Raises:
KeyError – If the product is not in the manifest.
FileNotFoundError – If product files are missing from disk.
- cache_key_for(name)[source]¶
Return the cache key recorded for product name.
Noneif the product has never been saved, or if it was saved before cache-key tracking existed (pre-0.2 manifest). Callers that treatNoneas “fresh” preserve the legacy behaviour.
- fresh(name, expected_cache_key)[source]¶
Trueiff name exists with the expected cache key.Used by
WorkflowRunnerto decide whether a cached product is still valid for the current inputs. Mismatched cache key ⇒ rebuild; missing entry ⇒ rebuild; pre-0.2 manifest entry (no cache_key recorded) ⇒ treated as fresh (legacy behaviour, not stricter than today).
- list()[source]¶
Display available products.
In Jupyter renders as an HTML table; falls back to plain text.
- status(module=None)[source]¶
Show which products exist vs need building.
If module is provided (a workflow module with
@workflow_stepdecorated functions), cross-references product names from step metadata against the manifest.- Parameters:
module (module, optional) – A workflow module to scan for
produces/requiresmetadata.
CLI helper¶
cli_from_config¶
Auto-derive an argparse parser from a WorkflowConfig subclass.
Every Pydantic field becomes a CLI flag, with type-aware mapping:
Pydantic type |
CLI form |
|---|---|
|
|
|
typed value |
|
|
anything else |
silently skipped |
Most workflows use this to build their CLI driver in five lines:
from underworld3.workflows import cli_from_config, config_from_args
import my_workflow_config as mw
parser = cli_from_config(mw.MyConfig)
parser.add_argument("--no-evolve", action="store_true")
args = parser.parse_args()
config = config_from_args(mw.MyConfig, args)
- underworld3.workflows.cli_from_config(ConfigClass, *, description=None, hidden_fields=frozenset({'description', 'workflow_name'}))[source]¶
Build an argparse parser auto-derived from ConfigClass’ fields.
- Parameters:
ConfigClass (type) – A
WorkflowConfigsubclass (or any PydanticBaseModelwithmodel_fields).description (str, optional) – Parser description. Defaults to the class’ docstring’s first line, if any.
hidden_fields (iterable of str) – Field names that should not be exposed on the CLI. Defaults to
{"workflow_name", "description"}— the workflow-itself metadata. Callers can pass their own list if they want to suppress (or expose) different fields.
- Returns:
parser – Pre-populated with one argument per recognised Pydantic field. Callers can
add_argumentfurther (e.g. action flags like--moviesor--no-evolve) before parsing.- Return type:
- underworld3.workflows.config_from_args(ConfigClass, args, *, hidden_fields=frozenset({'description', 'workflow_name'}))[source]¶
Construct a ConfigClass instance from parsed CLI args.
Nonevalues (i.e. fields the user didn’t override on the command line) drop through to the config’s default. Fields not present in args are silently ignored.- Parameters:
args (Namespace)
DAG diagrams¶
diagram¶
Generate a Graphviz DOT source string for a workflow module’s
produces/requires graph. Pass to dot -Tpng (or -Tsvg /
-Tpdf) to render.
- underworld3.workflows.diagram(module, status_provider=None, *, title=None, rankdir='LR')[source]¶
Generate a Graphviz DOT source string for module’s DAG.
- Parameters:
module (module) – A workflow module (e.g.
import convection_config as cc; passcc).status_provider (callable, optional) –
status_provider(name)→ status string for product name. Recognised statuses (and their fill colours) are listed in_STATUS_COLOURS— anything else falls through to the default “missing” grey. Typically wired toWorkflowRunner.status().title (str, optional) – Graph title. Defaults to
module.__name__.rankdir (str) – Graphviz
rankdir—"LR"(left→right, default) or"TB"(top→bottom).
- Returns:
dot_source – Compile with
dot -Tpng input.dot -o output.png(or-Tsvg) to render the diagram.- Return type:
render¶
One-call wrapper around diagram plus the dot binary —
generates a rendered PNG/SVG/PDF directly. Requires Graphviz
on PATH.
- underworld3.workflows.render(module, output_path, *, format=None, status_provider=None, rankdir='LR')[source]¶
Render module’s DAG to an image file via the
dotcommand.Convenience wrapper around
diagram()plussubprocess. Requires thedotbinary onPATH(apt install graphviz,brew install graphviz, etc.).- Parameters:
module (module) – A workflow module.
output_path (str or Path) – Destination file. Format inferred from the extension (
.png,.svg,.pdf) unless overridden by format.format (str, optional) – Graphviz output format, e.g.
"png","svg","pdf". Inferred from output_path extension if not given.status_provider (Callable[[str], str] | None) – Forwarded to
diagram().
- Returns:
output_path – Resolved path of the written image.
- Return type:
Path
- Raises:
FileNotFoundError – If the
dotcommand is not on PATH.
Discovery¶
view¶
Display the workflow steps and config classes in a workflow module. Convenient inside a Jupyter notebook (renders an HTML table); falls back to plain text in a terminal.
- underworld3.workflows.view(module)[source]¶
Display the workflow steps defined in module.
Scans module for functions decorated with
@workflow_stepand lists them with their descriptions. In Jupyter this renders as an HTML table; in a terminal as plain text.- Parameters:
module (module) – A workflow module (e.g.
import convection_config as convection; uw.workflows.view(convection)).
list_workflows / init_workflow¶
Discover available workflows on the system; scaffold a new
workflow into a target directory. See
underworld3.workflows.scaffold for details.
Utilities¶
check_dependencies¶
Check that optional packages a workflow needs are installed; emit
a clear error with install instructions if not. Use at workflow
module top so users don’t see a confusing ImportError deep in a
solve.
- underworld3.workflows.check_dependencies(packages)[source]¶
Check that optional packages are importable; raise with install hints.
- Parameters:
packages (dict) – Mapping of import name to install instruction, e.g.
{"geopandas": "pip install geopandas"}.- Raises:
ImportError – If any package cannot be imported.
- Return type:
None
parse_quantity¶
Parse a quantity string ("50 km", "1e21 Pa*s") into a
uw.quantity. Used internally by WorkflowConfig.setup_model.
show_source¶
Display the source of a workflow function. Useful for notebook-side introspection without leaving the cell.
- underworld3.workflows.show_source(fn)[source]¶
Display the source code of fn with syntax highlighting.
In Jupyter the source is rendered as a Python code block via
IPython.display.Markdown. In a terminal it falls back to plainprint.Works on any function or method, not just workflow steps.
- Parameters:
fn (callable) – The function whose source you want to inspect.
- Return type:
None