"""Run-directory primitives for ``underworld3.workflows``.
Owns the bytes-on-disk for a workflow run:
- ``manifest.yaml`` — run identity (workflow name, config hash,
``workflow_api`` version stamp).
- ``timeseries.csv`` — append-only per-step diagnostics with
schema-migrating writes ('---' for NaN).
- ``run_summary.yaml`` — steady-state "done" marker.
- discovery of the ``run.mesh.NNNNN.{h5,xdmf}`` checkpoint chain
(the actual h5 read/write goes through ``mesh.write_timestep`` and
``var.read_timestep`` — :meth:`Run.load_field` wraps the read side).
These types are intentionally minimal — pure adapters around the
on-disk format used by the Rayleigh-Bénard convection example. Once a
second consumer exercises the API, additions like
``Run.append_step(t, dt, fields, diags)`` or ``Run.create(parameters=,
scripts=)`` move from the design memo into the public surface and the
package bumps to ``__api_version__ = '1.0'``.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import numpy as np
import yaml
# Filename stem for the h5 / xdmf checkpoint chain. The convection
# workflow has always used "run"; held as a module constant so the
# default works without per-call configuration. A future workflow
# that wants a different stem will need this lifted to a Run-instance
# attribute or a per-call argument.
RUN_NAME = "run"
# ---------------------------------------------------------------------------
# Manifest
# ---------------------------------------------------------------------------
[docs]
@dataclass
class Manifest:
"""Wrapper around ``manifest.yaml`` contents.
The wrapped ``data`` dict is whatever the workflow chose to write —
no schema is enforced at this level (the package is at
``__api_version__ = '0.1'``; richer validation comes when
a second consumer tightens the contract). Convenience properties
expose the keys the convection workflow uses today plus the
package-level ``workflow_api`` stamp injected by
:meth:`Run.write_manifest`.
"""
data: dict = field(default_factory=dict)
[docs]
@classmethod
def read(cls, run_dir) -> Optional["Manifest"]:
"""Load ``manifest.yaml`` from *run_dir* or return ``None``."""
p = Path(run_dir) / "manifest.yaml"
if not p.exists():
return None
with open(p) as f:
return cls(yaml.safe_load(f) or {})
[docs]
def write(self, run_dir) -> None:
"""Write ``manifest.yaml`` into *run_dir*."""
p = Path(run_dir) / "manifest.yaml"
with open(p, "w") as f:
yaml.dump(self.data, f, default_flow_style=False, sort_keys=False)
# Dict-like access for code that just wants the raw fields.
def __getitem__(self, key):
return self.data[key]
[docs]
def get(self, key, default=None):
return self.data.get(key, default)
# Convenience properties for the keys the convection workflow uses.
@property
def workflow(self):
return self.data.get("workflow")
@property
def workflow_api(self):
"""Package ``__api_version__`` recorded at write time, or ``None``
for manifests written before the package added the stamp."""
return self.data.get("workflow_api")
@property
def config_hash(self):
return self.data.get("config_hash")
@property
def config_snapshot(self) -> dict:
return self.data.get("config_snapshot", {})
@property
def cache_key(self) -> Optional[str]:
"""Cache-key digest for the product this manifest describes.
Pre-0.2 manifests don't carry this; ``None`` then means "treat
as always-fresh" (the legacy behaviour).
"""
return self.data.get("cache_key")
@property
def inputs(self) -> dict:
"""Inputs the cache_key was derived from (config + upstream
product cache keys), or ``{}`` for pre-0.2 manifests."""
return self.data.get("inputs", {})
@property
def started_at(self):
return self.data.get("started_at")
# ---------------------------------------------------------------------------
# CSV cell helpers (workflow-agnostic)
# ---------------------------------------------------------------------------
def _csv_to_float(s) -> float:
"""Parse a CSV cell as float; treat ``None`` / ``''`` / ``'---'`` as NaN."""
if s is None:
return float("nan")
s = s.strip()
if s in ("", "---"):
return float("nan")
return float(s)
def _format_for_csv(val):
"""Serialise a value for CSV: ``None`` and non-finite floats → ``'---'``."""
if val is None:
return "---"
if isinstance(val, float) and not np.isfinite(val):
return "---"
return val
# ---------------------------------------------------------------------------
# Run
# ---------------------------------------------------------------------------
[docs]
class Run:
"""A run-output directory — thin wrapper, no IO on construction.
Construction does not touch disk; :meth:`Run.create` is the helper
that makes the directory. Reads (:attr:`manifest`, :attr:`steps`,
:attr:`timeseries`, :attr:`summary`) all hit disk fresh on each
access — no caching.
"""
[docs]
def __init__(self, path):
self.path = Path(path)
# --- construction -----------------------------------------------------
[docs]
@classmethod
def open(cls, path) -> "Run":
"""Open an existing run directory.
Currently a thin alias for ``Run(path)`` — no validation that
the path exists or contains a manifest. ``workflow_api`` /
``config_hash`` validation is deferred to ``__api_version__ =
'1.0'``, when the contract is firmed up by a second consumer.
"""
return cls(path)
[docs]
@classmethod
def create(cls, path) -> "Run":
"""Create the directory (parents + exist_ok) and return a Run."""
p = Path(path)
p.mkdir(parents=True, exist_ok=True)
return cls(p)
# --- manifest --------------------------------------------------------
@property
def manifest(self) -> Optional[Manifest]:
"""The manifest, or ``None`` if ``manifest.yaml`` doesn't exist."""
return Manifest.read(self.path)
[docs]
def write_manifest(self, data: dict) -> None:
"""Write ``manifest.yaml`` with the given dict.
Auto-injects two fields if the caller hasn't supplied them:
- ``workflow_api`` — the current package ``__api_version__``.
- ``cache_key`` — copied from ``config_hash`` for time-loop
run-directories where the run's identity is the only input.
Workflows producing run-directories that depend on upstream
products should set ``cache_key`` (and ``inputs``) on the
dict explicitly.
Manifests written before either stamp existed read back with
``manifest.workflow_api is None`` / ``manifest.cache_key is None``.
"""
from underworld3.workflows import __api_version__
out = dict(data)
out.setdefault("workflow_api", __api_version__)
if "cache_key" not in out and "config_hash" in out:
out["cache_key"] = out["config_hash"]
Manifest(out).write(self.path)
# --- checkpoint discovery -------------------------------------------
@property
def steps(self) -> list[int]:
"""Indices of saved timesteps, derived from the xdmf files."""
if not self.path.exists():
return []
steps = set()
for p in self.path.glob(f"{RUN_NAME}.mesh.[0-9]*.xdmf"):
try:
steps.add(int(p.stem.split(".")[-1]))
except ValueError:
continue
return sorted(steps)
# --- field IO --------------------------------------------------------
[docs]
def load_field(self, var, step: int, *, data_name: Optional[str] = None) -> None:
"""Read the saved field for *var* at *step* into *var* in place.
Thin wrapper around ``var.read_timestep`` that fills in this
run's directory and the workflow's filename stem. The h5 layer
does kd-tree interpolation when *var*'s mesh / FE space differs
from what was written, which is what makes the warm-start
recipe work across degrees and meshes.
``data_name`` defaults to *var*'s constructor name (e.g. ``"T"``).
"""
name = data_name if data_name is not None else getattr(var, "name", None)
if name is None:
raise ValueError(
"load_field needs a data_name when var has no .name attribute"
)
var.read_timestep(
data_filename=RUN_NAME,
data_name=name,
index=step,
outputPath=str(self.path),
)
# --- timeseries ------------------------------------------------------
@property
def timeseries_path(self) -> Path:
return self.path / "timeseries.csv"
@property
def timeseries(self) -> list[dict]:
"""Read ``timeseries.csv`` as a list of typed-row dicts.
``step`` is parsed as int; ``t`` and ``dt`` as float; every
other column is parsed via :func:`_csv_to_float` (so '---' /
empty cells become NaN). Missing file → ``[]``.
Columns absent from the file are absent from the row dicts —
callers that need legacy compatibility should fill in defaults
themselves.
"""
path = self.timeseries_path
if not path.exists():
return []
out = []
with open(path) as f:
for row in csv.DictReader(f):
parsed = {
"step": int(row["step"]),
"t": float(row["t"]),
"dt": float(row["dt"]),
}
for k, v in row.items():
if k in parsed:
continue
parsed[k] = _csv_to_float(v)
out.append(parsed)
return out
[docs]
def append_step(
self,
step: int,
t: float,
dt: float,
mesh,
mesh_vars: list,
diags: dict,
fields: tuple[str, ...],
) -> None:
"""Write an h5 checkpoint at *step* and append a timeseries row.
Combines the two operations a time-loop workflow does on every
save_every tick — ``mesh.write_timestep`` plus
:meth:`append_timeseries_row` — into one call so the caller's
loop body shrinks from four lines to one.
Parameters
----------
step : int
Step index (used as the h5 filename suffix and the
``"step"`` column of the timeseries row).
t, dt : float
Simulated time and step size; written into the
``"t"`` and ``"dt"`` columns.
mesh : underworld3 Mesh
Mesh whose ``write_timestep`` will be called.
mesh_vars : list of MeshVariable
The variables to checkpoint. Passed to
``mesh.write_timestep(meshVars=...)``.
diags : dict
Diagnostic columns to write alongside step/t/dt. Keys
should match the workflow's timeseries schema.
fields : tuple of str
The timeseries CSV column order — same value passed to
:meth:`append_timeseries_row`.
"""
mesh.write_timestep(
filename=RUN_NAME,
index=step,
outputPath=str(self.path),
meshVars=mesh_vars,
)
row = {"step": step, "t": t, "dt": dt, **diags}
self.append_timeseries_row(row, fields)
[docs]
def append_timeseries_row(
self, row: dict, fields: tuple[str, ...]
) -> None:
"""Append one row to ``timeseries.csv``, with schema migration.
If the file exists with a header that doesn't match *fields*,
the file is rewritten in place under the new schema (with
missing columns serialising as ``'---'``). Otherwise the row
is appended, with the header auto-added on first write.
``None`` and non-finite floats in *row* are written as
``'---'`` (visually distinct from numeric zero).
"""
path = self.timeseries_path
if path.exists():
with open(path) as f:
existing_header = f.readline().rstrip("\n")
existing_fields = tuple(existing_header.split(","))
if existing_fields != tuple(fields):
old_rows = self.timeseries
with open(path, "w", newline="") as f:
writer = csv.DictWriter(
f, fieldnames=list(fields), extrasaction="ignore",
)
writer.writeheader()
for r in old_rows:
writer.writerow(
{k: _format_for_csv(r.get(k)) for k in fields}
)
is_new = not path.exists()
with open(path, "a", newline="") as f:
writer = csv.DictWriter(
f, fieldnames=list(fields), extrasaction="ignore",
)
if is_new:
writer.writeheader()
writer.writerow({k: _format_for_csv(row.get(k)) for k in fields})
# --- summary --------------------------------------------------------
@property
def summary_path(self) -> Path:
return self.path / "run_summary.yaml"
@property
def summary(self) -> Optional[dict]:
"""Contents of ``run_summary.yaml``, or ``None`` if absent."""
p = self.summary_path
if not p.exists():
return None
with open(p) as f:
return yaml.safe_load(f)
[docs]
def write_summary(self, summary: dict) -> None:
with open(self.summary_path, "w") as f:
yaml.dump(summary, f, default_flow_style=False, sort_keys=False)
# --- lifecycle ------------------------------------------------------
[docs]
def archive(self) -> Optional[Path]:
"""Rename the directory to ``<name>.archive-<UTC stamp>/``.
Never deletes. Returns the new archive path, or ``None`` if
the directory doesn't exist.
"""
if not self.path.exists():
return None
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
archive = self.path.parent / f"{self.path.name}.archive-{ts}"
self.path.rename(archive)
return archive