"""WorkflowProducts — make-like product persistence for workflow pipelines.
Maps named products (meshes, variables, surfaces) to files on disk.
A YAML manifest tracks what exists, enabling selective rebuild and
parameter-study workflows where expensive products are reused.
Usage::
products = WorkflowProducts(config)
# Save after expensive steps
products.save("adapted_mesh", mesh)
products.save("fault_surfaces", surface_collection)
# Later — reload without recomputing
mesh = products.load("adapted_mesh")
surfaces = products.load("fault_surfaces", mesh=mesh)
# Check what's available
products.list()
products.status(h2ex_module)
"""
import inspect
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import numpy as np
import yaml
# ── Type detection ──────────────────────────────────────────────────
def _detect_type(obj):
"""Return a string type tag for dispatch."""
cls_name = type(obj).__name__
mod = type(obj).__module__ or ""
# Run-directory — a Run instance from the time-loop persistence
# primitive. Recognised first so a Run never falls through to
# other handlers (it has a Path attribute, but isn't a Path).
from ._run import Run as _Run
if isinstance(obj, _Run):
return "run_directory"
# File artefact — a Path returned by a step that wrote the file
# itself (CSV, PNG, MP4, ...). Recognised before the heavier types
# so a step can produce a file path even when downstream code might
# otherwise interpret it.
if isinstance(obj, Path):
return "file"
# Mesh types
if cls_name in ("Mesh", "MeshClass") or "discretisation" in mod and "Mesh" in cls_name:
if hasattr(obj, "write_checkpoint"):
return "mesh"
# MeshVariable
if hasattr(obj, "load_from_h5_plex_vector") and hasattr(obj, "save"):
return "mesh_variable"
# SurfaceCollection
if cls_name == "SurfaceCollection" and hasattr(obj, "surfaces"):
return "surface_collection"
# Surface
if cls_name == "Surface" and hasattr(obj, "pv_mesh"):
return "surface"
# numpy array
if isinstance(obj, np.ndarray):
return "ndarray"
# dict of surfaces
if isinstance(obj, dict) and obj:
first = next(iter(obj.values()))
if type(first).__name__ == "Surface":
return "surface_dict"
return "unknown"
# ── Manifest helpers ────────────────────────────────────────────────
def _load_manifest(path):
if path.exists():
with open(path) as f:
return yaml.safe_load(f) or {}
return {}
def _save_manifest(path, manifest):
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
yaml.dump(manifest, f, default_flow_style=False, sort_keys=False)
# ── Main class ──────────────────────────────────────────────────────
[docs]
class WorkflowProducts:
"""Manages workflow product persistence (make-like).
Products are named objects (meshes, variables, surfaces) saved to a
products directory. Each product has a type-aware save/load method.
A YAML manifest tracks what exists.
Parameters
----------
config : WorkflowConfig, optional
If provided, ``products_dir`` defaults to
``config.output_dir / "products"``.
products_dir : str or Path, optional
Explicit products directory. Overrides config.
"""
[docs]
def __init__(self, config=None, products_dir=None):
if products_dir is not None:
self._dir = Path(products_dir)
elif config is not None:
self._dir = Path(config.output_dir) / "products"
else:
self._dir = Path("products")
self._dir.mkdir(parents=True, exist_ok=True)
self._manifest_path = self._dir / "manifest.yaml"
@property
def products_dir(self):
return self._dir
# ── Save ────────────────────────────────────────────────────────
[docs]
def save(self, name: str, obj, *, cache_key=None, inputs=None, **metadata):
"""Save a product to disk.
Type dispatch:
- **Mesh** — HDF5 via ``write_timestep()`` (vertex-based, portable)
- **MeshVariable** — HDF5 via ``write_timestep()`` with the mesh
- **Surface** — VTK via ``.save()``
- **SurfaceCollection** — directory of VTK files
- **dict of Surfaces** — directory of VTK files
- **ndarray** — ``.npz``
Parameters
----------
name : str
Product name (used as filename stem and manifest key).
obj : object
The object to save.
cache_key : str, optional
Hex digest of this product's inputs. Used by
:meth:`fresh` to decide whether a future caller's expected
inputs match the cached version. Pre-0.2 callers pass
nothing here; the entry is then treated as "always fresh"
on read (legacy behaviour).
inputs : dict, optional
The inputs *cache_key* was derived from — typically
``{"config": {...}, "requires": {upstream_name: cache_key}}``.
Persisted alongside the digest so future readers can audit
what changed.
**metadata
Extra key-value pairs stored in the manifest entry.
"""
product_type = _detect_type(obj)
files = []
if product_type == "mesh":
files = self._save_mesh(name, obj)
elif product_type == "mesh_variable":
files = self._save_mesh_variable(name, obj)
metadata["var_name"] = obj.clean_name
elif product_type == "surface":
files = self._save_surface(name, obj)
elif product_type == "surface_collection":
files = self._save_surface_collection(name, obj)
elif product_type == "surface_dict":
files = self._save_surface_dict(name, obj)
elif product_type == "ndarray":
fname = f"{name}.npz"
np.savez(self._dir / fname, data=obj)
files = [fname]
elif product_type == "file":
# The producer already wrote the file; just record the
# path. Stored as a string so the YAML manifest stays
# round-trippable.
files = [str(obj)]
elif product_type == "run_directory":
# The producer wrote a complete time-loop run directory
# (manifest.yaml + h5 chain + timeseries.csv + ...). Just
# record its path.
files = [str(obj.path)]
else:
# Try YAML serialisation as fallback
fname = f"{name}.yaml"
with open(self._dir / fname, "w") as f:
yaml.dump(obj, f, default_flow_style=False)
files = [fname]
product_type = "yaml"
# Update manifest
manifest = _load_manifest(self._manifest_path)
entry = {
"type": product_type,
"files": files,
"saved_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
}
if cache_key is not None:
entry["cache_key"] = cache_key
if inputs is not None:
entry["inputs"] = inputs
if metadata:
entry["metadata"] = metadata
manifest[name] = entry
_save_manifest(self._manifest_path, manifest)
def _save_mesh(self, name, mesh):
"""Save a mesh via write_timestep (vertex-based, portable HDF5+XDMF)."""
mesh.write_timestep(
name, index=0, outputPath=str(self._dir),
meshVars=[], swarmVars=[], meshUpdates=True,
)
# write_timestep creates <outputPath>/<name>.mesh.00000.h5
return [f"{name}.mesh.00000.h5"]
def _save_mesh_variable(self, name, var):
"""Save a MeshVariable via write_timestep.
Writes the mesh geometry and the variable's vertex values to
separate HDF5 files with XDMF metadata. On reload,
``read_timestep`` uses kd-tree interpolation so the variable
can even be loaded onto a different mesh.
"""
mesh = var.mesh
mesh.write_timestep(
name, index=0, outputPath=str(self._dir),
meshVars=[var], swarmVars=[], meshUpdates=True,
)
# Produces: <name>.mesh.00000.h5 and <name>.mesh.<clean_name>.00000.h5
mesh_h5 = f"{name}.mesh.00000.h5"
var_h5 = f"{name}.mesh.{var.clean_name}.00000.h5"
return [mesh_h5, var_h5]
def _save_surface(self, name, surface):
"""Save a single Surface to VTK."""
fname = f"{name}.vtk"
surface.save(str(self._dir / fname))
return [fname]
def _save_surface_collection(self, name, collection):
"""Save a SurfaceCollection as a directory of VTK files."""
subdir = self._dir / name
subdir.mkdir(parents=True, exist_ok=True)
files = []
for surf_name, surface in collection.surfaces.items():
fname = f"{name}/{surf_name}.vtk"
surface.save(str(self._dir / fname))
files.append(fname)
return files
def _save_surface_dict(self, name, surface_dict):
"""Save a dict of Surfaces as a directory of VTK files."""
subdir = self._dir / name
subdir.mkdir(parents=True, exist_ok=True)
files = []
for surf_name, surface in surface_dict.items():
fname = f"{name}/{surf_name}.vtk"
surface.save(str(self._dir / fname))
files.append(fname)
return files
# ── Load ────────────────────────────────────────────────────────
[docs]
def load(self, name: str, mesh=None, mesh_variable=None):
"""Load a product by name.
Parameters
----------
name : str
Product name (must exist in manifest).
mesh : Mesh, optional
Required for loading Surfaces that need a mesh reference.
mesh_variable : MeshVariable, optional
For ``mesh_variable`` products: the target variable to load
data into (via ``read_timestep``). Must already exist on a
mesh.
Returns
-------
object
The loaded product.
Raises
------
KeyError
If the product is not in the manifest.
FileNotFoundError
If product files are missing from disk.
"""
manifest = _load_manifest(self._manifest_path)
if name not in manifest:
available = ", ".join(manifest.keys()) or "(none)"
raise KeyError(
f"Product '{name}' not found. Available: {available}"
)
entry = manifest[name]
product_type = entry["type"]
files = entry["files"]
if product_type == "mesh":
return self._load_mesh(name, files)
elif product_type == "mesh_variable":
if mesh_variable is None:
raise ValueError(
f"Loading mesh_variable '{name}' requires "
f"mesh_variable= argument (the target MeshVariable "
f"to load data into via read_timestep)"
)
return self._load_mesh_variable(name, entry, mesh_variable)
elif product_type == "surface":
return self._load_surface(name, files, mesh)
elif product_type in ("surface_collection", "surface_dict"):
return self._load_surface_collection(name, files, mesh)
elif product_type == "ndarray":
data = np.load(self._dir / files[0])
return data["data"]
elif product_type == "file":
# Return the recorded path; the file's content is the
# producer's responsibility (it lives at this path on disk).
return Path(files[0])
elif product_type == "run_directory":
# Return a Run object pointing at the recorded directory.
from ._run import Run as _Run
return _Run.open(files[0])
elif product_type == "yaml":
with open(self._dir / files[0]) as f:
return yaml.safe_load(f)
else:
raise ValueError(f"Unknown product type '{product_type}' for '{name}'")
def _load_mesh(self, name, files):
"""Load a mesh from the timestep HDF5 file."""
import underworld3 as uw
h5_path = str(self._dir / files[0])
return uw.discretisation.Mesh(h5_path)
def _load_mesh_variable(self, name, entry, mesh_variable):
"""Load a MeshVariable saved via write_timestep.
Uses ``read_timestep`` which reads vertex-based HDF5 and
interpolates via kd-tree onto the target variable's DOFs.
Works even if the mesh has changed (kd-tree interpolation).
"""
var_meta = entry.get("metadata", {})
var_name = var_meta.get("var_name", mesh_variable.clean_name)
mesh_variable.read_timestep(
name, var_name, index=0, outputPath=str(self._dir),
)
return mesh_variable
def _load_surface(self, name, files, mesh):
"""Load a Surface from VTK."""
from underworld3.meshing import Surface
vtk_path = str(self._dir / files[0])
return Surface.from_vtk(vtk_path, mesh=mesh)
def _load_surface_collection(self, name, files, mesh):
"""Load a SurfaceCollection from a directory of VTK files."""
from underworld3.meshing import SurfaceCollection
collection = SurfaceCollection()
for f in files:
vtk_path = str(self._dir / f)
if Path(vtk_path).exists():
collection.add_from_vtk(vtk_path, mesh=mesh)
return collection
# ── Query ───────────────────────────────────────────────────────
[docs]
def exists(self, name: str) -> bool:
"""Check if a product exists in the manifest and on disk."""
manifest = _load_manifest(self._manifest_path)
if name not in manifest:
return False
# Verify at least one file exists
for f in manifest[name].get("files", []):
if (self._dir / f).exists():
return True
return False
[docs]
def cache_key_for(self, name: str) -> Optional[str]:
"""Return the cache key recorded for product *name*.
``None`` if the product has never been saved, *or* if it was
saved before cache-key tracking existed (pre-0.2 manifest).
Callers that treat ``None`` as "fresh" preserve the legacy
behaviour.
"""
manifest = _load_manifest(self._manifest_path)
return manifest.get(name, {}).get("cache_key")
[docs]
def fresh(self, name: str, expected_cache_key: str) -> bool:
"""``True`` iff *name* exists with the expected cache key.
Used by :class:`WorkflowRunner` to decide whether a cached
product is still valid for the current inputs. Mismatched
cache key ⇒ rebuild; missing entry ⇒ rebuild; pre-0.2
manifest entry (no cache_key recorded) ⇒ treated as fresh
(legacy behaviour, not stricter than today).
"""
if not self.exists(name):
return False
recorded = self.cache_key_for(name)
if recorded is None:
return True
return recorded == expected_cache_key
[docs]
def list(self):
"""Display available products.
In Jupyter renders as an HTML table; falls back to plain text.
"""
manifest = _load_manifest(self._manifest_path)
if not manifest:
try:
from IPython.display import HTML, display
display(HTML("<em>No products saved yet.</em>"))
except ImportError:
print("No products saved yet.")
return
rows = []
for name, entry in manifest.items():
on_disk = any(
(self._dir / f).exists() for f in entry.get("files", [])
)
rows.append((
name,
entry.get("type", "?"),
entry.get("saved_at", "?"),
"yes" if on_disk else "MISSING",
))
try:
from IPython.display import HTML, display
th = (
'<th style="text-align:left; padding:4px 12px 4px 0; '
'border-bottom:2px solid #ccc;">'
)
html = (
f"<h4>Workflow Products</h4>"
f'<p style="color:#888;">Directory: <code>{self._dir}</code></p>'
f'<table style="border-collapse:collapse;">'
f"<tr>{th}Product</th>{th}Type</th>{th}Saved</th>{th}On Disk</th></tr>"
)
for name, ptype, saved, on_disk in rows:
colour = "#060" if on_disk == "yes" else "#c00"
td = '<td style="padding:2px 12px 2px 0;'
html += (
f"<tr>"
f'{td} font-family:monospace;">{name}</td>'
f'{td}">{ptype}</td>'
f'{td} color:#888;">{saved}</td>'
f'{td} color:{colour}; font-weight:bold;">{on_disk}</td>'
f"</tr>"
)
html += "</table>"
display(HTML(html))
except ImportError:
name_w = max(len(r[0]) for r in rows)
type_w = max(len(r[1]) for r in rows)
print(f"Workflow Products ({self._dir})")
print(f" {'Product':<{name_w}} {'Type':<{type_w}} {'Saved':<24} On Disk")
print(f" {'-' * name_w} {'-' * type_w} {'-' * 24} -------")
for name, ptype, saved, on_disk in rows:
print(f" {name:<{name_w}} {ptype:<{type_w}} {saved:<24} {on_disk}")
[docs]
def status(self, module=None):
"""Show which products exist vs need building.
If *module* is provided (a workflow module with ``@workflow_step``
decorated functions), cross-references product names from step
metadata against the manifest.
Parameters
----------
module : module, optional
A workflow module to scan for ``produces``/``requires`` metadata.
"""
manifest = _load_manifest(self._manifest_path)
# Gather all product names from module steps
all_products = set()
if module is not None:
for _name, obj in inspect.getmembers(module, callable):
if getattr(obj, "_is_workflow_step", False):
all_products.update(getattr(obj, "workflow_produces", []))
else:
all_products = set(manifest.keys())
if not all_products:
print("No products defined.")
return
rows = []
for product in sorted(all_products):
if product in manifest:
on_disk = any(
(self._dir / f).exists()
for f in manifest[product].get("files", [])
)
status_str = "ready" if on_disk else "MISSING"
saved = manifest[product].get("saved_at", "")
else:
status_str = "not built"
saved = ""
rows.append((product, status_str, saved))
try:
from IPython.display import HTML, display
th = (
'<th style="text-align:left; padding:4px 12px 4px 0; '
'border-bottom:2px solid #ccc;">'
)
html = (
f"<h4>Product Status</h4>"
f'<table style="border-collapse:collapse;">'
f"<tr>{th}Product</th>{th}Status</th>{th}Saved</th></tr>"
)
for product, status_str, saved in rows:
if status_str == "ready":
colour, icon = "#060", "\u2713"
elif status_str == "MISSING":
colour, icon = "#c00", "\u2717"
else:
colour, icon = "#888", "\u2022"
td = '<td style="padding:2px 12px 2px 0;'
html += (
f"<tr>"
f'{td} font-family:monospace;">{product}</td>'
f'{td} color:{colour}; font-weight:bold;">{icon} {status_str}</td>'
f'{td} color:#888;">{saved}</td>'
f"</tr>"
)
html += "</table>"
display(HTML(html))
except ImportError:
name_w = max(len(r[0]) for r in rows)
print("Product Status:")
for product, status_str, saved in rows:
mark = "[x]" if status_str == "ready" else "[ ]"
print(f" {mark} {product:<{name_w}} {status_str} {saved}")
[docs]
def remove(self, name: str):
"""Remove a product from disk and the manifest.
Parameters
----------
name : str
Product name to remove.
"""
manifest = _load_manifest(self._manifest_path)
if name not in manifest:
return
entry = manifest.pop(name)
for f in entry.get("files", []):
p = self._dir / f
if p.exists():
p.unlink()
# Clean up empty subdirectories
subdir = self._dir / name
if subdir.is_dir() and not any(subdir.iterdir()):
subdir.rmdir()
_save_manifest(self._manifest_path, manifest)
[docs]
def clear(self):
"""Remove all products from disk and the manifest."""
manifest = _load_manifest(self._manifest_path)
for name in list(manifest.keys()):
self.remove(name)