r"""
Time derivative approximations for transient problems.
This module provides classes for computing time derivatives (D/Dt) using
various numerical schemes. These are used within solvers to discretize the
time dimension of PDEs.
All DDt classes share a common interface:
- ``update(dt, ...)`` — alias for ``update_pre_solve``
- ``update_pre_solve(dt, ...)`` — called before the PDE solve
- ``update_post_solve(dt, ...)`` — called after the PDE solve
- ``bdf(order)`` — backward differentiation formula (returns Δψ, divide by Δt for rate)
- ``adams_moulton_flux(order)`` — weighted flux for implicit integration
**Symbolic** -- Pure symbolic history, no mesh storage. Used for flux
tracking in SNES_Diffusion where the flux expression is a SymPy tree.
**Eulerian** -- Fixed-grid time derivative with optional grid-based
advection. When ``V_fn`` is provided, ``update_pre_solve`` applies an
explicit advection correction so that ``bdf()`` approximates the full
material derivative.
**SemiLagrangian** -- Characteristic-based D/Dt via departure points.
Traces backward along velocity field to sample upstream values.
Unconditionally stable for advection (no CFL constraint).
**Lagrangian** -- Full particle-following D/Dt. Creates and manages
its own swarm, advected during ``update_post_solve``.
**Lagrangian_Swarm** -- Specialized swarm-based Lagrangian using a
user-provided swarm. Swarm advection is the user's responsibility.
Notes
-----
The choice of time derivative scheme affects accuracy, stability, and
computational cost:
- **Eulerian**: Fixed grid, simple, best for weak-to-moderate advection
or purely diffusive problems (e.g. Richards equation with no transport).
- **Eulerian (with V_fn)**: Adds explicit advection correction — subject
to CFL but suitable when velocity is moderate.
- **Semi-Lagrangian**: Best for advection-dominated problems (high Péclet).
Unconditionally stable but breaks down when v → 0.
- **Lagrangian**: Tracks material properties without numerical diffusion.
Best for purely advected quantities (e.g. composition).
See Also
--------
underworld3.systems.solvers : PDE solvers using these time derivatives.
"""
import sympy
from sympy import sympify
import numpy as np
from typing import Optional, Callable, Union
import underworld3 as uw
from underworld3 import VarType
import underworld3.timing as timing
from underworld3.utilities._api_tools import uw_object
from petsc4py import PETSc
def _as_float(value):
"""Extract a plain float from various numeric types (Pint, UWQuantity, etc.)."""
if value is None:
return None
if isinstance(value, (int, float)):
return float(value)
if hasattr(value, "magnitude"):
return float(value.magnitude)
if hasattr(value, "value"):
return float(value.value)
try:
return float(value)
except (TypeError, ValueError):
return None
def _bdf_coefficients(order, dt_current, dt_history):
r"""Compute BDF coefficients, handling variable timesteps.
For uniform timesteps the classical BDF-k coefficients are returned
as exact ``sympy.Rational`` values. When the timestep ratio differs
from unity the variable-step BDF-2 formula is used (BDF-3 falls back
to variable-step BDF-2 when the ratio exceeds 5 %).
Parameters
----------
order : int
BDF order (1, 2, or 3).
dt_current : float or None
Current timestep :math:`\Delta t_n`.
dt_history : list
Previous timesteps ``[dt_{n-1}, dt_{n-2}, ...]``.
Returns
-------
list of sympy.Basic
Coefficients ``[c0, c1, c2, ...]`` such that
.. math::
\Delta\psi_{\mathrm{BDF}} = c_0\,\psi^{n+1}
+ c_1\,\psi^n + c_2\,\psi^{n-1} + \cdots
Notes
-----
**BDF-1** (backward Euler) always returns ``[1, -1]``.
**BDF-2** with variable step uses
:math:`r = \Delta t_n / \Delta t_{n-1}`:
.. math::
c_0 = \frac{1+2r}{1+r}, \quad
c_1 = -(1+r), \quad
c_2 = \frac{r^2}{1+r}
which reduces to :math:`[3/2,\;-2,\;1/2]` when :math:`r=1`.
**BDF-3** uses the constant-step formula when all ratios are within
5 % of unity; otherwise it falls back to variable-step BDF-2.
"""
if order <= 1:
return [sympy.Integer(1), sympy.Integer(-1)]
dt_n = _as_float(dt_current)
dt_nm1 = (
_as_float(dt_history[0])
if len(dt_history) > 0 and dt_history[0] is not None
else None
)
if order == 2:
if dt_n is not None and dt_nm1 is not None and dt_nm1 > 0:
r = dt_n / dt_nm1
if abs(r - 1.0) < 1e-12:
# Exactly constant dt — use exact rationals
return [
sympy.Rational(3, 2),
sympy.Integer(-2),
sympy.Rational(1, 2),
]
else:
# Variable dt
return [
sympy.sympify((1 + 2 * r) / (1 + r)),
sympy.sympify(-(1 + r)),
sympy.sympify(r**2 / (1 + r)),
]
else:
# No usable history — constant-dt fallback
return [
sympy.Rational(3, 2),
sympy.Integer(-2),
sympy.Rational(1, 2),
]
# order >= 3
dt_nm2 = (
_as_float(dt_history[1])
if len(dt_history) > 1 and dt_history[1] is not None
else None
)
if (
dt_n is not None
and dt_nm1 is not None
and dt_nm2 is not None
and dt_nm1 > 0
and dt_nm2 > 0
):
r1 = dt_n / dt_nm1
r2 = dt_nm1 / dt_nm2
if abs(r1 - 1.0) < 0.05 and abs(r2 - 1.0) < 0.05:
# Approximately constant dt — standard BDF-3
return [
sympy.Rational(11, 6),
sympy.Integer(-3),
sympy.Rational(3, 2),
sympy.Rational(-1, 3),
]
else:
# Variable dt — fall back to BDF-2 with variable coefficients
return _bdf_coefficients(2, dt_current, dt_history)
else:
# Insufficient history — constant-dt BDF-3
return [
sympy.Rational(11, 6),
sympy.Integer(-3),
sympy.Rational(3, 2),
sympy.Rational(-1, 3),
]
# ============================================================================
# BDF/AM Coefficient Expressions
# ============================================================================
#
# These helpers create UWexpression coefficient objects and build fixed-structure
# symbolic expressions for bdf() and adams_moulton_flux(). The coefficients are
# routed through PETSc's constants[] array by the JIT compiler, so changing
# effective_order or variable dt only requires PetscDSSetConstants() — no
# recompilation.
# ============================================================================
from underworld3.function.expressions import UWexpression as _UWexpression
def _create_coefficients(order, prefix, instance_id):
"""Create UWexpression objects for BDF or AM coefficients.
Parameters
----------
order : int
Maximum order (number of history terms). Creates order+1 coefficients.
prefix : str
LaTeX prefix for display (e.g. "c^{BDF}" or "a^{AM}").
instance_id : int
Unique ID to disambiguate coefficients from different DDt instances.
Returns
-------
list of UWexpression
Coefficient expressions initialised to 0.0.
"""
coeffs = []
for i in range(order + 1):
c = _UWexpression(
rf"{prefix}_{{{i},{instance_id}}}",
sym=0.0,
description=f"{prefix} coefficient {i} (DDt instance {instance_id})",
_unique_name_generation=True,
)
coeffs.append(c)
return coeffs
def _update_bdf_values(coeffs, effective_order, dt, dt_history):
"""Update BDF coefficient UWexpression values for current state.
Sets active coefficients from _bdf_coefficients() and zeroes the rest.
"""
values = _bdf_coefficients(effective_order, dt, dt_history)
for i, v in enumerate(values):
coeffs[i].sym = float(v)
for i in range(len(values), len(coeffs)):
coeffs[i].sym = 0.0
def _update_am_values(coeffs, effective_order, theta=0.5):
"""Update Adams-Moulton coefficient UWexpression values for current state.
AM coefficients for each order (constant-dt formulas):
- Order 0: [1]
- Order 1: [theta, 1-theta]
- Order 2: [5/12, 8/12, -1/12]
- Order 3: [9/24, 19/24, -5/24, 1/24]
"""
if effective_order <= 0:
values = [1.0]
elif effective_order == 1:
values = [float(theta), 1.0 - float(theta)]
elif effective_order == 2:
values = [5.0 / 12, 8.0 / 12, -1.0 / 12]
elif effective_order >= 3:
values = [9.0 / 24, 19.0 / 24, -5.0 / 24, 1.0 / 24]
for i, v in enumerate(values):
coeffs[i].sym = v
for i in range(len(values), len(coeffs)):
coeffs[i].sym = 0.0
def _build_weighted_sum(coeffs, psi_fn, psi_star_syms):
"""Build a fixed-structure weighted sum: c0*psi + c1*psi_star[0] + ...
The symbolic structure includes all terms up to len(coeffs)-1.
Inactive terms have coefficient=0 and vanish numerically.
Parameters
----------
coeffs : list of UWexpression
Coefficient expressions (length = order + 1).
psi_fn : sympy expression
Current-time field expression.
psi_star_syms : list of sympy expressions
History term symbolic expressions (psi_star[i].sym or psi_star[i]).
Returns
-------
sympy expression
The weighted sum.
"""
result = coeffs[0] * psi_fn
for i in range(len(psi_star_syms)):
if i + 1 < len(coeffs):
result = result + coeffs[i + 1] * psi_star_syms[i]
return result
[docs]
class Symbolic(uw_object):
r"""
Symbolic history manager for time derivative approximations.
Manages the update of a variable :math:`\psi` across timesteps. The history
operator stores :math:`\psi` over several timesteps (given by ``order``) so
that it can compute backward differentiation (BDF) or Adams-Moulton expressions.
The history operator is defined as:
.. math::
\psi_p^{t-n\Delta t} &\leftarrow \psi_p^{t-(n-1)\Delta t} \\
\psi_p^{t-(n-1)\Delta t} &\leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \\
\psi_p^{t-\Delta t} &\leftarrow \psi_p^{t}
This is a purely symbolic history manager that operates on sympy expressions
without mesh or swarm storage. It is useful for tracking symbolic expressions
through time-stepping algorithms.
Parameters
----------
psi_fn : sympy.Basic
The sympy expression to track. Can be scalar or matrix form.
theta : float, optional
Implicitness parameter for Adams-Moulton order 1 (default ``0.5``).
Values: 0 = explicit, 1 = implicit, 0.5 = Crank-Nicolson.
varsymbol : str, optional
LaTeX symbol for display (default ``r"\\psi"``).
verbose : bool, optional
Enable verbose output (default ``False``).
bcs : list, optional
Boundary conditions (default ``[]``).
order : int, optional
Order of time integration (1-3) (default ``1``).
smoothing : float, optional
Smoothing parameter (default ``0.0``).
Notes
-----
The ``Symbolic`` class is the base for understanding BDF and Adams-Moulton
formulas without the complexity of mesh or swarm storage. It is primarily
useful for:
- Understanding time-stepping algorithm behavior
- Debugging symbolic expressions in time-dependent problems
- Prototyping before implementing with mesh/swarm storage
For actual simulations, use ``Eulerian``, ``SemiLagrangian``, or
``Lagrangian`` which store history on computational meshes or swarms.
See Also
--------
Eulerian : Mesh-based history with BDF time-stepping.
SemiLagrangian : Nodal-swarm approach for advection-dominated problems.
Lagrangian : Swarm-based material tracking.
"""
[docs]
@timing.routine_timer_decorator
def __init__(
self,
psi_fn: sympy.Basic, # a sympy expression for ψ; can be scalar or matrix
theta: Optional[float] = 0.5,
varsymbol: Optional[str] = r"\psi",
verbose: Optional[bool] = False,
bcs=[],
order: int = 1,
smoothing: float = 0.0,
):
super().__init__()
self.theta = theta
self.bcs = bcs
self.verbose = verbose
self.smoothing = smoothing
self.order = order
# History tracking: deferred initialization and effective order
self._history_initialised = False
self._n_solves_completed = 0
self._dt = None # current timestep (set by solver or update_pre_solve)
self._dt_history = [None] * order # previous timesteps for variable-dt BDF
# Ensure psi_fn is a sympy Matrix.
if not isinstance(psi_fn, sympy.Matrix):
try:
psi_fn = sympy.Matrix(psi_fn)
except Exception:
psi_fn = sympy.Matrix([[psi_fn]])
self._psi_fn = psi_fn # stored with its native shape
self._shape = psi_fn.shape # capture the shape
# Set the display symbol for psi_fn and for the history variable.
self._psi_fn_symbol = varsymbol # e.g. "\psi"
self._psi_star_symbol = varsymbol + r"^\ast" # e.g. "\psi^\ast"
# Create the history list: each element is a Matrix of shape _shape.
self.psi_star = [sympy.zeros(*self._shape) for _ in range(order)]
# BDF/AM coefficient UWexpressions — routed through PetscDS constants[]
self._bdf_coeffs = _create_coefficients(order, r"c^{\mathrm{BDF}}", self.instance_number)
self._am_coeffs = _create_coefficients(order, r"a^{\mathrm{AM}}", self.instance_number)
# Initialise to order-1 values
_update_bdf_values(self._bdf_coeffs, 1, None, [])
_update_am_values(self._am_coeffs, 1, self.theta)
return
@property
def psi_fn(self):
r"""Current symbolic expression :math:`\psi` being tracked."""
return self._psi_fn
@psi_fn.setter
def psi_fn(self, new_fn):
"""Set the tracked symbolic expression."""
if not isinstance(new_fn, sympy.Matrix):
try:
new_fn = sympy.Matrix(new_fn)
except Exception:
new_fn = sympy.Matrix([[new_fn]])
# Optionally, one could check for matching shape; here we update both.
self._psi_fn = new_fn
self._shape = new_fn.shape
return
def _object_viewer(self):
from IPython.display import Latex, display
# Display the primary variable
display(Latex(rf"$\quad {self._psi_fn_symbol} = {sympy.latex(self._psi_fn)}$"))
# Display the history variable using the different symbol.
history_latex = ", ".join([sympy.latex(elem) for elem in self.psi_star])
display(Latex(rf"$\quad {self._psi_star_symbol} = \left[{history_latex}\right]$"))
@property
def effective_order(self):
"""Current effective BDF order, accounting for history startup.
For BDF order k, k distinct history values are needed. During
startup, ``effective_order`` ramps from 1 to ``self.order`` as
successive solves populate the history slots with distinct values.
"""
# BDF-k requires k completed solves to have k distinct history values.
# With 0 or 1 completed solves → order 1. Order 2 needs ≥2 solves.
return min(self.order, max(1, self._n_solves_completed))
[docs]
def update_history_fn(self):
r"""Copy current :math:`\psi` to the first history slot ``psi_star[0]``."""
# Update the first history element with a copy of the current ψ.
self.psi_star[0] = self.psi_fn.copy()
[docs]
def initialise_history(self):
r"""Initialize all history slots to the current value of :math:`\psi`.
Called automatically on the first ``update_pre_solve``. Can also
be called manually after setting initial conditions.
"""
self.update_history_fn()
# Propagate the initial history to all history steps.
for i in range(1, self.order):
self.psi_star[i] = self.psi_star[0].copy()
self._history_initialised = True
return
[docs]
def initiate_history_fn(self):
"""Deprecated: use ``initialise_history`` instead."""
self.initialise_history()
[docs]
def update(
self,
dt,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
):
"""Update history (alias for ``update_pre_solve``)."""
self.update_pre_solve(dt, evalf, verbose)
return
[docs]
def update_pre_solve(
self,
dt,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
):
"""Pre-solve update hook. Auto-initialises history on first call."""
self._dt = dt
if not self._history_initialised:
self.initialise_history()
# Update coefficient values for current effective_order and dt
_update_bdf_values(self._bdf_coeffs, self.effective_order, self._dt, self._dt_history)
_update_am_values(self._am_coeffs, self.effective_order, self.theta)
return
[docs]
def update_post_solve(
self,
dt,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
):
r"""Shift history chain after solve: :math:`\psi^{*n} \leftarrow \psi^{*(n-1)}`."""
self._dt = dt
if verbose:
print(f"Updating history for ψ = {self.psi_fn}", flush=True)
# Record timestep history for variable-dt BDF
for i in range(self.order - 1, 0, -1):
self._dt_history[i] = self._dt_history[i - 1]
self._dt_history[0] = dt
# Shift history: copy each element down the chain.
for i in range(self.order - 1, 0, -1):
self.psi_star[i] = self.psi_star[i - 1].copy()
self.update_history_fn()
if self._n_solves_completed < self.order:
self._n_solves_completed += 1
return
@property
def bdf_coefficients(self):
"""Current BDF coefficients [c0, c1, ...] accounting for variable timesteps."""
return _bdf_coefficients(self.effective_order, self._dt, self._dt_history)
[docs]
def bdf(self, order: Optional[int] = None):
r"""Backward differentiation approximation of the time-derivative of ψ.
Returns a fixed-structure symbolic expression using UWexpression
coefficients. The coefficient values are updated each step in
``update_pre_solve`` — no JIT recompilation needed when the
order ramps up or the timestep changes.
Parameters
----------
order : int, optional
Ignored (kept for API compatibility). The effective order is
controlled by the coefficient values.
"""
return _build_weighted_sum(self._bdf_coeffs, self.psi_fn, self.psi_star)
[docs]
def adams_moulton_flux(self, order: Optional[int] = None):
r"""Adams-Moulton flux approximation for implicit time integration.
Returns a fixed-structure symbolic expression using UWexpression
coefficients. Values are updated each step in ``update_pre_solve``.
Parameters
----------
order : int, optional
Ignored (kept for API compatibility).
"""
return _build_weighted_sum(self._am_coeffs, self.psi_fn, self.psi_star)
[docs]
class Eulerian(uw_object):
r"""
Eulerian (mesh-based) history manager for time derivatives.
Manages the update of a variable :math:`\psi` on the mesh across timesteps,
storing history values on mesh variables for backward differentiation.
.. math::
\psi_p^{t-n\Delta t} &\leftarrow \psi_p^{t-(n-1)\Delta t} \\
\psi_p^{t-(n-1)\Delta t} &\leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \\
\psi_p^{t-\Delta t} &\leftarrow \psi_p^{t}
When ``V_fn`` is provided, the ``update_pre_solve`` method applies an
explicit advection correction so that ``bdf()`` approximates the full
material derivative :math:`D\psi/Dt = \partial\psi/\partial t + \mathbf{u}\cdot\nabla\psi`
rather than the partial time derivative alone.
.. note::
The optional advection capability (V_fn parameter) is suitable for
problems where the advection is weak or where a purely grid-based
approach is desired (e.g., Richards equation with no transport).
For advection-dominated problems, SemiLagrangian is more mature and
generally preferred.
Parameters
----------
mesh : underworld3.discretisation.Mesh
The computational mesh.
psi_fn : MeshVariable or sympy.Basic
The quantity to track. Can be a mesh variable or symbolic expression.
vtype : VarType
Variable type (SCALAR, VECTOR, etc.) for history storage.
degree : int
Polynomial degree for history mesh variables.
continuous : bool
Whether history variables are continuous across element boundaries.
V_fn : sympy.Basic, optional
Velocity field for grid-based advection correction.
If None (default), computes pure ∂ψ/∂t. If set, computes
D/Dt = ∂/∂t + u·∇ via operator splitting.
evalf : bool, default=False
If True, evaluate expressions numerically during updates.
theta : float, default=0.5
Time-stepping parameter for implicit/explicit blending.
theta=0 is fully explicit, theta=1 is fully implicit.
varsymbol : str, default=r"u"
LaTeX symbol for display.
verbose : bool, default=False
Enable verbose output during updates.
bcs : list, default=[]
Boundary conditions to apply to projections.
order : int, default=1
Number of history timesteps to store (for multi-step methods).
smoothing : float, default=0.0
Smoothing parameter for projections.
See Also
--------
SemiLagrangian : For advection-dominated problems with nodal swarm.
Lagrangian : For full Lagrangian tracking on swarms.
Symbolic : For purely symbolic history (no mesh storage).
"""
[docs]
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
psi_fn: Union[
uw.discretisation.MeshVariable, sympy.Basic
], # sympy function or mesh variable
vtype: uw.VarType,
degree: int,
continuous: bool,
V_fn=None,
evalf: Optional[bool] = False,
theta: Optional[float] = 0.5,
varsymbol: Optional[str] = r"u",
verbose: Optional[bool] = False,
bcs=[],
order=1,
smoothing=0.0,
):
super().__init__()
self.mesh = mesh
self.V_fn = V_fn
self.theta = theta
self.bcs = bcs
self.verbose = verbose
self.degree = degree
self.vtype = vtype
self.continuous = continuous
self.smoothing = smoothing
self.evalf = evalf
# History tracking: deferred initialization and effective order
self._history_initialised = False
self._n_solves_completed = 0
# meshVariables are required for:
#
# u(t) - evaluation of u_fn at the current time
# u*(t) - u_* evaluated from
# psi is evaluated/stored at `order` timesteps. We can't
# be sure if psi is a meshVariable or a function to be evaluated
# psi_star is reaching back through each evaluation and has to be a
# meshVariable (storage)
if isinstance(psi_fn, uw.discretisation.MeshVariable):
self._psi_fn = psi_fn.sym ### get symbolic form of the meshvariable
self._psi_meshVar = psi_fn
else:
self._psi_fn = psi_fn ### already in symbolic form
self._psi_meshVar = None
self.order = order
self._dt = None # current timestep (set by solver or update_pre_solve)
self._dt_history = [None] * order # previous timesteps for variable-dt BDF
psi_star = []
self.psi_star = psi_star
for i in range(order):
self.psi_star.append(
uw.discretisation.MeshVariable(
f"psi_star_Eulerian_{self.instance_number}_{i}",
self.mesh,
vtype=vtype,
degree=degree,
continuous=continuous,
varsymbol=rf"{varsymbol}^{{ {'*'*(i+1)} }}",
)
)
# BDF/AM coefficient UWexpressions — routed through PetscDS constants[]
self._bdf_coeffs = _create_coefficients(order, r"c^{\mathrm{BDF}}", self.instance_number)
self._am_coeffs = _create_coefficients(order, r"a^{\mathrm{AM}}", self.instance_number)
# Initialise to order-1 values
_update_bdf_values(self._bdf_coeffs, 1, None, [])
_update_am_values(self._am_coeffs, 1, self.theta)
return
@property
def psi_fn(self):
r"""Current symbolic expression :math:`\psi` being tracked."""
return self._psi_fn
@psi_fn.setter
def psi_fn(self, new_fn):
"""Set the tracked expression."""
self._psi_fn = new_fn
# self._psi_star_projection_solver.uw_function = self.psi_fn
return
def _object_viewer(self):
from IPython.display import Latex, Markdown, display
super()._object_viewer()
## feedback on this instance
# display(Latex(r"$\quad\psi = $ " + self.psi._repr_latex_()))
# display(
# Latex(
# r"$\quad\Delta t_{\textrm{phys}} = $ "
# + sympy.sympify(self.dt_physical)._repr_latex_()
# )
# )
display(Latex(rf"$\quad$History steps = {self.order}"))
def _setup_projections(self):
"""Initialize projection solvers for history updates."""
### using this to store terms that can't be evaluated (e.g. derivatives)
# The projection operator for mapping derivative values to the mesh - needs to be different for each variable type, unfortunately ...
if self.vtype == uw.VarType.SCALAR:
self._psi_star_projection_solver = uw.systems.solvers.SNES_Projection(
self.mesh, self.psi_star[0], verbose=False
)
elif self.vtype == uw.VarType.VECTOR:
self._psi_star_projection_solver = uw.systems.solvers.SNES_Vector_Projection(
self.mesh, self.psi_star[0], verbose=False
)
elif self.vtype == uw.VarType.SYM_TENSOR or self.vtype == uw.VarType.TENSOR:
import math
dim = self.mesh.dim
if self.vtype == uw.VarType.SYM_TENSOR:
Nc = math.comb(dim + 1, 2) # 3 in 2D, 6 in 3D
self._psi_star_indep_indices = [
(i, j) for i in range(dim) for j in range(i, dim)
]
else:
Nc = dim * dim
self._psi_star_indep_indices = [
(i, j) for i in range(dim) for j in range(dim)
]
self._psi_star_flat_var = uw.discretisation.MeshVariable(
f"psi_star_flat_{self.instance_number}",
self.mesh,
(1, Nc),
vtype=uw.VarType.MATRIX,
degree=self.degree,
continuous=self.continuous,
varsymbol=r"{\psi^{*}_{\mathrm{flat}}}",
)
self._psi_star_projection_solver = uw.systems.solvers.SNES_MultiComponent_Projection(
self.mesh,
u_Field=self._psi_star_flat_var,
n_components=Nc,
degree=self.degree,
verbose=False,
)
self._psi_star_use_multicomponent = True
if getattr(self, '_psi_star_use_multicomponent', False):
# Flatten tensor to (1, Nc) row for multicomponent solver
import sympy
indep = self._psi_star_indep_indices
row = sympy.Matrix([[self.psi_fn[i, j] for (i, j) in indep]])
self._psi_star_projection_solver.uw_function = row
else:
self._psi_star_projection_solver.uw_function = self.psi_fn
self._psi_star_projection_solver.bcs = self.bcs
self._psi_star_projection_solver.smoothing = self.smoothing
@property
def effective_order(self):
"""Current effective BDF order, accounting for history startup.
For BDF order k, k distinct history values are needed. During
startup, ``effective_order`` ramps from 1 to ``self.order`` as
successive solves populate the history slots with distinct values.
"""
# BDF-k requires k completed solves to have k distinct history values.
# With 0 or 1 completed solves → order 1. Order 2 needs ≥2 solves.
return min(self.order, max(1, self._n_solves_completed))
[docs]
def update_history_fn(self):
r"""Copy current :math:`\psi` to ``psi_star[0]`` via evaluation or projection."""
### update first value in history chain
### avoids projecting if function can be evaluated
try:
try:
self.psi_star[0].data[...] = self._psi_meshVar.data[...]
except:
self.psi_star[0].data[...] = uw.function.evaluate(
self.psi_fn,
self.psi_star[0].coords,
evalf=self.evalf,
).reshape(-1, max(self.psi_fn.shape))
except:
self._setup_projections()
self._psi_star_projection_solver.solve()
[docs]
def initialise_history(self):
r"""Initialize all history slots to the current value of :math:`\psi`.
Called automatically on the first ``update_pre_solve``. Can also
be called manually after setting initial conditions to ensure
the history chain starts from the correct state.
"""
self.update_history_fn()
### set up all history terms to the initial values
for i in range(self.order - 1, 0, -1):
self.psi_star[i].data[...] = self.psi_star[0].data[...]
self._history_initialised = True
return
[docs]
def initiate_history_fn(self):
"""Deprecated: use ``initialise_history`` instead."""
self.initialise_history()
[docs]
def update(
self,
dt,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
):
"""Update history (alias for ``update_pre_solve``)."""
self.update_pre_solve(dt, evalf, verbose)
return
[docs]
def update_pre_solve(
self,
dt,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
):
"""Pre-solve: auto-initialise history and apply advection correction.
On the first call, automatically initialises history from the
current field values. If V_fn is set, also applies an explicit
grid-based advection correction so that bdf() approximates the
material derivative Dφ/Dt rather than ∂φ/∂t.
"""
self._dt = dt
if not self._history_initialised:
self.initialise_history()
# Update coefficient values for current effective_order and dt
_update_bdf_values(self._bdf_coeffs, self.effective_order, self._dt, self._dt_history)
_update_am_values(self._am_coeffs, self.effective_order, self.theta)
if self.V_fn is not None and dt is not None:
coords = self.psi_star[0].coords
dim = self.mesh.dim
X = self.mesh.X
# Build u·∇φ symbolically for each component of psi_fn
# psi_fn is a Matrix; V_fn is also a Matrix. For scalar
# psi_fn the shape is (1,1); for vector it is (1,dim).
psi = self.psi_fn
V = self.V_fn
ncomp = max(psi.shape) # number of tracked components
for c in range(ncomp):
# ∂φ_c/∂x_i for each spatial dimension
grad_c = sympy.Matrix([psi[c].diff(X[i]) for i in range(dim)])
# u·∇φ_c = V_i * ∂φ_c/∂x_i
advection_expr = sum(V[i] * grad_c[i] for i in range(dim))
advection_vals = uw.function.evaluate(
advection_expr, coords, evalf=evalf,
).reshape(-1)
self.psi_star[0].data[:, c] -= dt * advection_vals
return
[docs]
def update_post_solve(
self,
dt,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
):
r"""Shift history chain after solve: :math:`\psi^{*n} \leftarrow \psi^{*(n-1)}`."""
self._dt = dt
if verbose and uw.mpi.rank == 0:
print(f"Update {self.psi_fn}", flush=True)
# Record timestep history for variable-dt BDF
for i in range(self.order - 1, 0, -1):
self._dt_history[i] = self._dt_history[i - 1]
self._dt_history[0] = dt
### copy values down the chain
for i in range(self.order - 1, 0, -1):
self.psi_star[i].data[...] = self.psi_star[i - 1].data[...]
### update the history fn
self.update_history_fn()
if self._n_solves_completed < self.order:
self._n_solves_completed += 1
return
@property
def bdf_coefficients(self):
"""Current BDF coefficients [c0, c1, ...] accounting for variable timesteps."""
return _bdf_coefficients(self.effective_order, self._dt, self._dt_history)
[docs]
def bdf(self, order=None):
r"""Backward differentiation approximation of the time-derivative of :math:`\psi`.
Returns a fixed-structure symbolic expression using UWexpression
coefficients. Values are updated each step in ``update_pre_solve``.
Parameters
----------
order : int, optional
Ignored (kept for API compatibility).
"""
return _build_weighted_sum(self._bdf_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
[docs]
def adams_moulton_flux(self, order=None):
r"""Adams-Moulton flux approximation for implicit time integration.
Returns a fixed-structure symbolic expression using UWexpression
coefficients. Values are updated each step in ``update_pre_solve``.
Parameters
----------
order : int, optional
Ignored (kept for API compatibility).
"""
return _build_weighted_sum(self._am_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
[docs]
class SemiLagrangian(uw_object):
r"""
Semi-Lagrangian history manager using nodal swarm.
Manages the semi-Lagrangian update of a mesh variable :math:`\psi`
across timesteps. Uses a nodal swarm to track departure points and
interpolate values back to the mesh.
.. math::
\psi_p^{t-n\Delta t} &\leftarrow \psi_p^{t-(n-1)\Delta t} \\
\psi_p^{t-(n-1)\Delta t} &\leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \\
\psi_p^{t-\Delta t} &\leftarrow \psi_p^{t}
The semi-Lagrangian method traces characteristics backward in time
to find departure points, providing stable advection without CFL
restrictions while maintaining accuracy.
Parameters
----------
mesh : underworld3.discretisation.Mesh
The computational mesh.
psi_fn : sympy.Function
The quantity to advect (typically a mesh variable's symbolic form).
V_fn : sympy.Function
Velocity field for advection (e.g., ``stokes.u.sym``).
vtype : VarType
Variable type (SCALAR, VECTOR, SYM_TENSOR, etc.).
degree : int
Polynomial degree for mesh variable storage.
continuous : bool
Whether variables are continuous across element boundaries.
swarm_degree : int, optional
Polynomial degree for swarm interpolation. Defaults to ``degree``.
swarm_continuous : bool, optional
Continuity for swarm variables. Defaults to ``continuous``.
varsymbol : str, optional
LaTeX symbol for display.
verbose : bool, default=False
Enable verbose output during updates.
bcs : list, default=[]
Boundary conditions for projections.
order : int, default=1
Number of history timesteps (1 for first-order, 2 for second-order).
smoothing : float, default=0.0
Smoothing parameter for projections.
preserve_moments : bool, default=False
Use moment-preserving projection (experimental).
Notes
-----
The semi-Lagrangian method is particularly useful for:
- Advection-dominated problems (high Péclet number)
- Problems where CFL stability is restrictive
- Viscoelastic stress advection
See Also
--------
Eulerian : For fixed-mesh time derivatives without advection.
Lagrangian : For full particle-following Lagrangian tracking.
"""
[docs]
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
psi_fn: sympy.Function,
V_fn: sympy.Function,
vtype: uw.VarType,
degree: int,
continuous: bool,
swarm_degree: Optional[int] = None,
swarm_continuous: Optional[bool] = None,
varsymbol: Optional[str] = None,
verbose: Optional[bool] = False,
bcs=[],
order=1,
smoothing=0.0,
preserve_moments=False,
):
super().__init__()
self.mesh = mesh
self.bcs = bcs
self.verbose = verbose
self.degree = degree
self.continuous = continuous
self._psi_fn = psi_fn
self.V_fn = V_fn
self.order = order
self.preserve_moments = preserve_moments
# History tracking: deferred initialization and effective order
self._history_initialised = False
self._n_solves_completed = 0
self._dt = None # current timestep (set by solver or update_pre_solve)
self._dt_history = [None] * order # previous timesteps for variable-dt BDF
if swarm_degree is None:
self.swarm_degree = degree
else:
self.swarm_degree = swarm_degree
if swarm_continuous is None:
self.swarm_continuous = continuous
else:
self.swarm_continuous = swarm_continuous
if varsymbol is None:
varsymbol = rf"u_{{ [{self.instance_number}] }}"
# meshVariables are required for:
#
# u(t) - evaluation of u_fn at the current time
# u*(t) - u_* evaluated from
# psi is evaluated/stored at `order` timesteps. We can't
# be sure if psi is a meshVariable or a function to be evaluated
# but psi_star is reaching back through each evaluation and has to be a
# meshVariable (storage)
psi_star = []
self.psi_star = psi_star
# Propagate units from psi_fn to psi_star if the model supports units.
# Internal psi_star variables should match the user's variable units when possible,
# but if no reference quantities are set, use unitless variables to avoid strict mode errors.
psi_units = uw.get_units(psi_fn)
# Check if the model can handle units (has reference quantities set)
model = uw.get_default_model()
if psi_units is not None and not model.has_units():
# Model doesn't have reference quantities - don't propagate units to internal vars
psi_units = None
for i in range(order):
self.psi_star.append(
uw.discretisation.MeshVariable(
f"psi_star_sl_{self.instance_number}_{i}",
self.mesh,
vtype=vtype,
degree=self.degree,
continuous=self.continuous,
varsymbol=rf"{{ {varsymbol}^{{ {'*'*(i+1)} }} }}",
units=psi_units, # Inherit units from psi_fn (or None if model has no units)
)
)
# BDF/AM coefficient UWexpressions — routed through PetscDS constants[]
self._bdf_coeffs = _create_coefficients(order, r"c^{\mathrm{BDF}}", self.instance_number)
self._am_coeffs = _create_coefficients(order, r"a^{\mathrm{AM}}", self.instance_number)
# Initialise to order-1 values
_update_bdf_values(self._bdf_coeffs, 1, None, [])
_update_am_values(self._am_coeffs, 1, 0.5)
# Working variable that has a potentially different discretisation from psi_star
# We project from this to psi_star and we use this variable to define the
# advection sample points
self._workVar = uw.discretisation.MeshVariable(
f"W_{self.instance_number}_{i}",
self.mesh,
vtype=vtype,
degree=self.swarm_degree,
continuous=self.swarm_continuous,
varsymbol=rf"{{ {varsymbol}^\nabla }}",
units=psi_units, # Inherit units from psi_fn
)
# We just need one swarm since this is inherently a sequential operation
nswarm = uw.swarm.NodalPointSwarm(self._workVar, verbose)
self._nswarm_psi = nswarm
# The projection operator for mapping swarm values to the mesh - needs to be different for
# each variable type, unfortunately ...
if vtype == uw.VarType.SCALAR:
self._psi_star_projection_solver = uw.systems.solvers.SNES_Projection(
self.mesh, self.psi_star[0], verbose=False
)
elif vtype == uw.VarType.VECTOR:
self._psi_star_projection_solver = uw.systems.solvers.SNES_Vector_Projection(
self.mesh,
self.psi_star[0],
verbose=False,
)
elif vtype == uw.VarType.SYM_TENSOR or vtype == uw.VarType.TENSOR:
import math
dim = self.mesh.dim
if vtype == uw.VarType.SYM_TENSOR:
Nc = math.comb(dim + 1, 2)
self._psi_star_indep_indices = [
(i, j) for i in range(dim) for j in range(i, dim)
]
else:
Nc = dim * dim
self._psi_star_indep_indices = [
(i, j) for i in range(dim) for j in range(dim)
]
self._psi_star_flat_var = uw.discretisation.MeshVariable(
f"psi_star_flat_slcn_{self.instance_number}",
self.mesh,
(1, Nc),
vtype=uw.VarType.MATRIX,
degree=degree,
continuous=continuous,
varsymbol=r"{\psi^{*}_{\mathrm{flat}}}",
)
self._psi_star_projection_solver = uw.systems.solvers.SNES_MultiComponent_Projection(
self.mesh,
u_Field=self._psi_star_flat_var,
n_components=Nc,
degree=degree,
verbose=False,
)
self._psi_star_use_multicomponent = True
# We should find a way to add natural bcs here
# (self.Unknowns.u carried as a symbol from solver to solver)
if getattr(self, '_psi_star_use_multicomponent', False):
import sympy
indep = self._psi_star_indep_indices
fn = self._workVar.sym
row = sympy.Matrix([[fn[i, j] for (i, j) in indep]])
self._psi_star_projection_solver.uw_function = row
else:
self._psi_star_projection_solver.uw_function = self._workVar.sym
self._psi_star_projection_solver.bcs = bcs
self._psi_star_projection_solver.smoothing = smoothing
self._smoothing = smoothing
self.I = uw.maths.Integral(mesh, None)
return
@property
def psi_fn(self):
r"""Current symbolic expression :math:`\psi` being tracked."""
return self._psi_fn
@psi_fn.setter
def psi_fn(self, new_fn):
"""Set the tracked expression."""
self._psi_fn = new_fn
if getattr(self, '_psi_star_use_multicomponent', False):
import sympy
indep = self._psi_star_indep_indices
row = sympy.Matrix([[new_fn[i, j] for (i, j) in indep]])
self._psi_star_projection_solver.uw_function = row
else:
self._psi_star_projection_solver.uw_function = self._psi_fn
return
def _object_viewer(self):
from IPython.display import Latex, Markdown, display
super()._object_viewer()
display(Latex(rf"$\quad$History steps = {self.order}"))
@property
def effective_order(self):
"""Current effective BDF order, accounting for history startup.
For BDF order k, k distinct history values are needed. During
startup, ``effective_order`` ramps from 1 to ``self.order`` as
successive solves populate the history slots with distinct values.
"""
# BDF-k requires k completed solves to have k distinct history values.
# With 0 or 1 completed solves → order 1. Order 2 needs ≥2 solves.
return min(self.order, max(1, self._n_solves_completed))
[docs]
def initialise_history(self):
r"""Initialize all history slots to the current value of :math:`\psi`.
Called automatically on the first ``update_pre_solve``. Can also
be called manually after setting initial conditions.
"""
# Evaluate psi_fn at psi_star node positions and store in psi_star[0]
from underworld3.utilities.unit_aware_array import UnitAwareArray
coords = self.psi_star[0].coords
if hasattr(coords, "magnitude"):
coords_nd = uw.non_dimensionalise(coords)
if isinstance(coords_nd, UnitAwareArray):
coords_nd = np.array(coords_nd)
elif hasattr(coords_nd, 'magnitude'):
coords_nd = coords_nd.magnitude
else:
coords_nd = coords
try:
eval_result = uw.function.evaluate(self.psi_fn, coords_nd)
psi_units = self.psi_star[0].units
if psi_units is not None and not isinstance(eval_result, UnitAwareArray):
eval_result = UnitAwareArray(eval_result, units=psi_units)
self.psi_star[0].array[...] = eval_result
except Exception:
if getattr(self, '_psi_star_use_multicomponent', False):
import sympy
indep = self._psi_star_indep_indices
row = sympy.Matrix([[self.psi_fn[i, j] for (i, j) in indep]])
self._psi_star_projection_solver.uw_function = row
self._psi_star_projection_solver.smoothing = 0.0
self._psi_star_projection_solver.solve()
# Fan out flat result to tensor psi_star[0]
for k, (i, j) in enumerate(indep):
vals = self._psi_star_flat_var.array[:, 0, k]
self.psi_star[0].array[:, i, j] = vals
if i != j:
self.psi_star[0].array[:, j, i] = vals
else:
self._psi_star_projection_solver.uw_function = self.psi_fn
self._psi_star_projection_solver.smoothing = 0.0
self._psi_star_projection_solver.solve()
# Copy to all other history slots
for i in range(1, self.order):
self.psi_star[i].array[...] = self.psi_star[0].array[...]
self._history_initialised = True
return
[docs]
def initiate_history_fn(self):
"""Deprecated: use ``initialise_history`` instead."""
self.initialise_history()
[docs]
def update(
self,
dt: float,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
dt_physical: Optional = None,
):
"""Update history (alias for ``update_pre_solve``)."""
self.update_pre_solve(dt, evalf, verbose, dt_physical)
return
[docs]
def update_post_solve(
self,
dt: float,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
dt_physical: Optional[float] = None,
):
"""Post-solve: record timestep and increment solve counter."""
self._dt = dt
# Record timestep history for variable-dt BDF
for i in range(self.order - 1, 0, -1):
self._dt_history[i] = self._dt_history[i - 1]
self._dt_history[0] = dt
if self._n_solves_completed < self.order:
self._n_solves_completed += 1
return
[docs]
def update_pre_solve(
self,
dt: float,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
dt_physical: Optional[float] = None,
store_result: Optional[bool] = True,
):
"""Sample upstream values along characteristics before solve.
On the first call, automatically initialises history from the
current field values so that bdf() returns zero on the first step.
Parameters
----------
store_result : bool, optional
If True (default), evaluate psi_fn at current positions and store
in psi_star[0] before advection — the standard DDt behaviour.
If False, skip this step and the history shift: only advect the
existing psi_star levels upstream. Used by VE_Stokes where
psi_star[0] already contains the projected actual stress from
the previous solve.
"""
self._dt = dt
if not self._history_initialised:
self.initialise_history()
# Update coefficient values for current effective_order and dt
_update_bdf_values(self._bdf_coeffs, self.effective_order, self._dt, self._dt_history)
_update_am_values(self._am_coeffs, self.effective_order, 0.5)
## Progress from the oldest part of the history
# 1. Copy the stored values down the chain in preparation for the next timestep
# The history term is the nodel value of psi_fn offset back along the characteristics
# according to the timestep.
# That is:
#
# - psi_star[0] is the current value of psi_fn, sampled
# at the location of the nodes in their previous position at t-\Delta t
#
# - psi_star[1] is the value of psi_star[0] from the previous timestep
# sampled at the location of the nodes at t - \Delta t. (note this is approximately
# equivalent to the value of psi_star[0] at t - 2\Delta t)
#
# - psi_star[2] etc if required ...
#
# First we copy the history, then we sample can sample upstream values
if dt_physical is not None:
phi = sympy.Min(1, dt / dt_physical)
else:
phi = sympy.sympify(1)
if store_result:
for i in range(self.order - 1, 0, -1):
self.psi_star[i].array[...] = (
phi * self.psi_star[i - 1].array[...] + (1 - phi) * self.psi_star[i].array[...]
)
# 2. Compute the current value of psi_fn which we store in psi_star[0]
# Note the need to do a try/except to handle unsupported evaluations
# (e.g. of derivatives)
#
# When store_result=False (e.g. VE stress history), skip this step —
# psi_star[0] already contains the projected actual stress from
# the previous solve and we want to advect *that*, not the flux.
# CRITICAL FIX (2025-11-28): Handle coordinates correctly for unit-aware mode.
# Previous bug: extracting .magnitude gives METERS (e.g., 1000000), but:
# - mesh.get_closest_cells() expects [0-1] non-dimensional coords
# - evaluate() assumes plain numpy is [0-1] non-dimensional
# Solution: use uw.non_dimensionalise() for proper conversion, OR pass
# unit-aware coords to evaluate() which handles conversion internally.
from underworld3.utilities.unit_aware_array import UnitAwareArray
psi_star_0_coords = self.psi_star[0].coords
# For mesh internal operations, need non-dimensional [0-1] coordinates
if hasattr(psi_star_0_coords, "magnitude"):
# Unit-aware coords - need to non-dimensionalize (not just extract magnitude!)
psi_star_0_coords_nd = uw.non_dimensionalise(psi_star_0_coords)
# Extract to plain numpy for mesh operations
if isinstance(psi_star_0_coords_nd, UnitAwareArray):
psi_star_0_coords_nd = np.array(psi_star_0_coords_nd)
elif hasattr(psi_star_0_coords_nd, 'magnitude'):
psi_star_0_coords_nd = psi_star_0_coords_nd.magnitude
else:
psi_star_0_coords_nd = np.array(psi_star_0_coords_nd)
else:
# Plain numpy - assume already non-dimensional
psi_star_0_coords_nd = psi_star_0_coords
cellid = self.mesh.get_closest_cells(
psi_star_0_coords_nd,
)
# Move slightly within the chosen cell to avoid edge effects
centroid_coords = self.mesh._centroids[cellid]
shift = 0.001
node_coords_nd = (1.0 - shift) * psi_star_0_coords_nd[:, :] + shift * centroid_coords[
:, :
]
if store_result:
try:
# Use shifted ND coords to avoid quad mesh boundary issues
# node_coords_nd is slightly shifted toward cell centroids
# evaluate() treats plain numpy as ND [0-1] coordinates
eval_result = uw.function.evaluate(
self.psi_fn,
node_coords_nd,
evalf=evalf,
)
# Wrap result with units if psi_star has units but eval didn't return UnitAwareArray
psi_star_units = self.psi_star[0].units
if psi_star_units is not None and not isinstance(eval_result, UnitAwareArray):
eval_result = UnitAwareArray(eval_result, units=psi_star_units)
self.psi_star[0].array[...] = eval_result
except Exception:
# Fallback to projection solver for expressions that can't be directly evaluated
# (e.g., containing derivatives)
self._psi_star_projection_solver.uw_function = self.psi_fn
self._psi_star_projection_solver.smoothing = 0.0
self._psi_star_projection_solver.solve(verbose=verbose)
# 3. Compute the upstream values from the psi_fn
# We use the u_star variable as a working value here so we have to work backwards
# so we don't over-write the history terms
#
# Convert dt to model units for numerical arithmetic
# (after symbolic logic that may use dt with units)
# Note: uw is already imported at module level (line 7)
model = uw.get_default_model()
# DIAGNOSTIC: Capture information about the unit system
coords_template = self.psi_star[0].coords
has_units = hasattr(coords_template, "magnitude") or hasattr(coords_template, "_magnitude")
# Maintain unit system consistency: either keep everything with units or convert to non-dimensional
if has_units:
# Physical coordinate system with units
# dt must be converted to base SI seconds so that dt * velocity(m/s) = distance(m)
if hasattr(dt, "to"): # It's a Pint quantity
dt_for_calc = dt.to("second") # Convert to seconds (still a quantity)
else:
# If dt is already a dimensionless number, treat it as seconds
dt_for_calc = dt
else:
# Non-dimensional coordinate system - convert dt to non-dimensional
# CRITICAL: Actually non-dimensionalize the timestep!
if hasattr(dt, "magnitude") or hasattr(dt, "value"):
# dt has units - non-dimensionalize it
dt_nondim = uw.non_dimensionalise(dt, model)
# Extract the dimensionless value
if hasattr(dt_nondim, "magnitude"):
dt_for_calc = float(dt_nondim.magnitude)
elif hasattr(dt_nondim, "value"):
dt_for_calc = float(dt_nondim.value)
else:
dt_for_calc = float(dt_nondim)
else:
# Already dimensionless
dt_for_calc = dt
for i in range(self.order - 1, -1, -1):
# 2nd order update along characteristics
# Use shifted ND coords to avoid quad mesh boundary issues
# node_coords_nd is slightly shifted toward cell centroids (lines 703-709)
# evaluate() treats plain numpy as ND [0-1] coordinates
v_result = uw.function.evaluate(
self.V_fn,
node_coords_nd,
)
# CRITICAL: Preserve UnitAwareArray through slicing
# Slicing can sometimes return plain numpy views - need to preserve wrapper
from underworld3.utilities.unit_aware_array import UnitAwareArray
if isinstance(v_result, UnitAwareArray):
# Slice and rewrap to preserve units
v_at_node_pts = v_result[:, 0, :]
if not isinstance(v_at_node_pts, UnitAwareArray):
# Slicing lost the wrapper - rewrap it
v_at_node_pts = UnitAwareArray(v_at_node_pts, units=v_result.units)
else:
v_at_node_pts = v_result[:, 0, :]
# Non-dimensionalize velocities when working with dimensionless coordinates
# This prevents dimensional mismatch: velocities in m/s mixed with coords in [0,1]
# CRITICAL: evaluate now returns UnitAwareArray with units attached
# Check if velocities already have units before trying to add them manually
if not has_units:
# Coordinates are dimensionless - need to non-dimensionalize velocities too
if isinstance(v_at_node_pts, UnitAwareArray):
# Velocities already have units from evaluate - just non-dimensionalize
v_nondim = uw.non_dimensionalise(v_at_node_pts, model)
# Extract numpy array for dimensionless calculation
if isinstance(v_nondim, UnitAwareArray):
v_at_node_pts = np.array(v_nondim)
elif hasattr(v_nondim, "value"):
v_at_node_pts = v_nondim.value
else:
v_at_node_pts = v_nondim
else:
# Velocities don't have units - try to add them manually (legacy path)
v_units = uw.get_units(self.V_fn)
if v_units and v_units != "dimensionless":
v_with_units = UnitAwareArray(v_at_node_pts, units=v_units)
v_nondim = uw.non_dimensionalise(v_with_units, model)
if isinstance(v_nondim, UnitAwareArray):
v_at_node_pts = np.array(v_nondim)
elif hasattr(v_nondim, "value"):
v_at_node_pts = v_nondim.value
else:
v_at_node_pts = v_nondim
else:
# Dimensional mode - ensure velocities have units
# CRITICAL FIX (2025-11-27): Variable data is stored NON-DIMENSIONALLY.
# We must DIMENSIONALIZE (not just wrap) the values before dimensional arithmetic.
# Previous bug: wrapping 0.01 (ND) with cm/yr gave 0.01 cm/yr instead of 1 cm/yr.
if not isinstance(v_at_node_pts, UnitAwareArray):
v_units = uw.get_units(self.V_fn)
if v_units and v_units != "dimensionless":
# Re-dimensionalize using the scaling system
if uw.is_nondimensional_scaling_active():
from underworld3.scaling import dimensionalise
# dimensionalise(nd_value, units) -> value * scale in those units
v_dimensional = dimensionalise(v_at_node_pts, v_units)
v_at_node_pts = UnitAwareArray(v_dimensional.magnitude, units=v_dimensional.units)
else:
# No scaling active - assume values are already dimensional
v_at_node_pts = UnitAwareArray(v_at_node_pts, units=v_units)
# Get coordinates
coords = self.psi_star[i].coords
# CRITICAL: When working in dimensionless mode, extract coords to plain arrays
# to match the dimensionless velocities (otherwise unit mismatch occurs)
from underworld3.utilities.unit_aware_array import UnitAwareArray
if not has_units and isinstance(coords, UnitAwareArray):
# Extract to plain numpy for dimensionless arithmetic
coords = np.array(coords)
# CRITICAL (2025-11-27): Multiply velocity FIRST so UnitAwareArray.__mul__ handles it.
# If we do `dt_for_calc * v_at_node_pts`, Pint handles it and loses UnitAwareArray units.
mid_pt_coords = coords - v_at_node_pts * (0.5 * dt_for_calc)
# Clamp midpoint coordinates to the domain boundary
if self.mesh.return_coords_to_bounds is not None:
mid_pt_coords = self.mesh.return_coords_to_bounds(mid_pt_coords)
v_mid_result = uw.function.global_evaluate(
self.V_fn,
mid_pt_coords,
)
# CRITICAL: Preserve UnitAwareArray through slicing
if isinstance(v_mid_result, UnitAwareArray):
# Slice and rewrap to preserve units
v_at_mid_pts = v_mid_result[:, 0, :]
if not isinstance(v_at_mid_pts, UnitAwareArray):
# Slicing lost the wrapper - rewrap it
v_at_mid_pts = UnitAwareArray(v_at_mid_pts, units=v_mid_result.units)
else:
v_at_mid_pts = v_mid_result[:, 0, :]
# Non-dimensionalize mid-point velocities when working with dimensionless coordinates
# CRITICAL: global_evaluate now returns UnitAwareArray with units attached
# Check if velocities already have units before trying to add them manually
if not has_units:
# Coordinates are dimensionless - need to non-dimensionalize velocities too
if isinstance(v_at_mid_pts, UnitAwareArray):
# Velocities already have units from global_evaluate - just non-dimensionalize
v_nondim = uw.non_dimensionalise(v_at_mid_pts, model)
# Extract numpy array for dimensionless calculation
if isinstance(v_nondim, UnitAwareArray):
v_at_mid_pts = np.array(v_nondim)
elif hasattr(v_nondim, "value"):
v_at_mid_pts = v_nondim.value
else:
v_at_mid_pts = v_nondim
else:
# Velocities don't have units - try to add them manually (legacy path)
v_units = uw.get_units(self.V_fn)
if v_units and v_units != "dimensionless":
v_with_units = UnitAwareArray(v_at_mid_pts, units=v_units)
v_nondim = uw.non_dimensionalise(v_with_units, model)
if isinstance(v_nondim, UnitAwareArray):
v_at_mid_pts = np.array(v_nondim)
elif hasattr(v_nondim, "value"):
v_at_mid_pts = v_nondim.value
else:
v_at_mid_pts = v_nondim
else:
# Dimensional mode - ensure velocities have units
# CRITICAL: If V_fn doesn't have unit metadata, evaluate() returns plain numpy
# We need to manually wrap it with units for dimensional arithmetic to work
if not isinstance(v_at_mid_pts, UnitAwareArray):
v_units = uw.get_units(self.V_fn)
if v_units and v_units != "dimensionless":
# Wrap velocities with their proper units
v_at_mid_pts = UnitAwareArray(v_at_mid_pts, units=v_units)
# Calculate upstream coordinates: current position - velocity * timestep
end_pt_coords = coords - v_at_mid_pts * dt_for_calc
# Clamp upstream coordinates to the domain boundary
if self.mesh.return_coords_to_bounds is not None:
end_pt_coords = self.mesh.return_coords_to_bounds(end_pt_coords)
# Extract scalar from (1,1) Matrix for scalar variables
# MeshVariable.sym returns Matrix([[value]]) for scalars
expr_to_evaluate = self.psi_star[i].sym
if hasattr(expr_to_evaluate, 'shape') and expr_to_evaluate.shape == (1, 1):
expr_to_evaluate = expr_to_evaluate[0, 0]
# Evaluate psi_star at upstream coordinates
# global_evaluate now returns dimensional results (gateway fix 2025-11-28)
value_at_end_points = uw.function.global_evaluate(
expr_to_evaluate,
end_pt_coords,
)
# CRITICAL FIX (2025-11-27): If psi_star has units, ensure the assigned
# value also has units. global_evaluate may return plain arrays.
psi_star_units = self.psi_star[i].units
if psi_star_units is not None and not isinstance(value_at_end_points, UnitAwareArray):
value_at_end_points = UnitAwareArray(value_at_end_points, units=psi_star_units)
self.psi_star[i].array[...] = value_at_end_points
# disable this for now - Compute moments before update
if 0 and self.preserve_moments and self._workVar.num_components == 1:
self.I.fn = self.psi_star[i].sym[0]
Imean0 = self.I.evaluate()
self.I.fn = (self.psi_star[i].sym[0] - Imean0) ** 2
IL20 = np.sqrt(self.I.evaluate())
# disable this for now - Restore moments after update
if 0 and self.preserve_moments and self._workVar.num_components == 1:
self.I.fn = self.psi_star[i].sym[0]
Imean = self.I.evaluate()
self.I.fn = (self.psi_star[i].sym[0] - Imean) ** 2
IL2 = np.sqrt(self.I.evaluate())
# TODO: DELETE remove swarm.access / data, replace with direct array assignment
# with self.mesh.access(self.psi_star[i]):
# self.psi_star[i].data[...] += Imean0 - Imean
self.psi_star[i].array[...] += Imean0 - Imean
self.I.fn = (self.psi_star[i].sym[0] - Imean0) ** 2
IL2 = np.sqrt(self.I.evaluate())
# TODO: DELETE remove swarm.access / data, replace with direct array assignment
# with self.mesh.access(self.psi_star[i]):
# self.psi_star[i].data[...] = (
# self.psi_star[i].data[...] - Imean0
# ) * IL20 / IL2 + Imean0
self.psi_star[i].array[...] = (
self.psi_star[i].array[...] - Imean0
) * IL20 / IL2 + Imean0
return
@property
def bdf_coefficients(self):
"""Current BDF coefficients [c0, c1, ...] accounting for variable timesteps."""
return _bdf_coefficients(self.effective_order, self._dt, self._dt_history)
[docs]
def bdf(self, order=None):
r"""Backward differentiation approximation of the time-derivative of :math:`\psi`.
Returns a fixed-structure symbolic expression using UWexpression
coefficients. Values are updated each step in ``update_pre_solve``.
Parameters
----------
order : int, optional
Ignored (kept for API compatibility).
"""
return _build_weighted_sum(self._bdf_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
[docs]
def adams_moulton_flux(self, order=None):
r"""Adams-Moulton flux approximation for implicit time integration.
Returns a fixed-structure symbolic expression using UWexpression
coefficients. Values are updated each step in ``update_pre_solve``.
Parameters
----------
order : int, optional
Ignored (kept for API compatibility).
"""
return _build_weighted_sum(self._am_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
## Consider Deprecating this one - it is the same as the Lagrangian_Swarm but
## sets up the swarm for itself. This does not have a practical use-case - the swarm version
## is slower, more cumbersome, and less stable / accurate. The only reason to use
## it is if there is an existing swarm that we can re-purpose.
[docs]
class Lagrangian(uw_object):
r"""
Swarm-based Lagrangian history manager for material tracking.
Manages the update of a Lagrangian variable :math:`\psi` on a swarm
across timesteps. Creates and manages its own internal swarm for
tracking material properties through the flow.
.. math::
\psi_p^{t-n\Delta t} \leftarrow \psi_p^{t-(n-1)\Delta t}
\psi_p^{t-(n-1)\Delta t} \leftarrow \psi_p^{t-(n-2)\Delta t} \cdots
\psi_p^{t-\Delta t} \leftarrow \psi_p^{t}
The Lagrangian approach follows material points through the flow,
avoiding numerical diffusion in advection. History values are stored
on swarm variables with proxy mesh variables for solver integration.
Parameters
----------
mesh : underworld3.discretisation.Mesh
The computational mesh.
psi_fn : sympy.Function
The quantity to track (e.g., stress tensor ``stokes.stress``).
V_fn : sympy.Function
Velocity field for particle advection.
vtype : VarType
Variable type (SCALAR, VECTOR, SYM_TENSOR, etc.).
degree : int
Polynomial degree for proxy mesh variables.
continuous : bool
Whether proxy variables are continuous across elements.
varsymbol : str, default=r"u"
LaTeX symbol for display.
verbose : bool, default=False
Enable verbose output during updates.
bcs : list, default=[]
Boundary conditions (currently unused for swarm variables).
order : int, default=1
Number of history timesteps to store.
smoothing : float, default=0.0
Smoothing parameter for projections.
fill_param : int, default=3
Fill parameter for swarm population density.
Notes
-----
The Lagrangian method is ideal for:
- Viscoelastic stress history tracking
- Material property advection without diffusion
- Tracking compositional fields
The internal swarm is automatically advected during updates.
See Also
--------
SemiLagrangian : Mesh-based semi-Lagrangian with departure points.
Lagrangian_Swarm : For user-provided swarms.
"""
instances = (
0 # count how many of these there are in order to create unique private mesh variable ids
)
[docs]
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
psi_fn: sympy.Function,
V_fn: sympy.Function,
vtype: uw.VarType,
degree: int,
continuous: bool,
varsymbol: Optional[str] = r"u",
verbose: Optional[bool] = False,
bcs=[],
order=1,
smoothing=0.0,
fill_param=3,
):
super().__init__()
# create a new swarm to manage here
dudt_swarm = uw.swarm.UWSwarm(mesh)
self.mesh = mesh
self.swarm = dudt_swarm
self.psi_fn = psi_fn
self.V_fn = V_fn
self.verbose = verbose
self.order = order
# History tracking: deferred initialization and effective order
self._history_initialised = False
self._n_solves_completed = 0
self._dt = None # current timestep (set by solver or update_pre_solve)
self._dt_history = [None] * order # previous timesteps for variable-dt BDF
psi_star = []
self.psi_star = psi_star
for i in range(order):
self.psi_star.append(
uw.swarm.SwarmVariable(
f"psi_star_sw_{self.instance_number}_{i}",
self.swarm,
vtype=vtype,
proxy_degree=degree,
proxy_continuous=continuous,
varsymbol=rf"{varsymbol}^{{ {'*'*(i+1)} }}",
)
)
# BDF/AM coefficient UWexpressions — routed through PetscDS constants[]
self._bdf_coeffs = _create_coefficients(order, r"c^{\mathrm{BDF}}", self.instance_number)
self._am_coeffs = _create_coefficients(order, r"a^{\mathrm{AM}}", self.instance_number)
# Initialise to order-1 values
_update_bdf_values(self._bdf_coeffs, 1, None, [])
_update_am_values(self._am_coeffs, 1, 0.5)
dudt_swarm.populate(fill_param)
return
def _object_viewer(self):
from IPython.display import Latex, Markdown, display
super()._object_viewer()
## feedback on this instance
display(Latex(r"$\quad\psi = $ " + self.psi._repr_latex_()))
display(
Latex(
r"$\quad\Delta t_{\textrm{phys}} = $ "
+ sympy.sympify(self.dt_physical)._repr_latex_()
)
)
display(Latex(rf"$\quad$History steps = {self.order}"))
@property
def effective_order(self):
"""Current effective BDF order, accounting for history startup."""
# BDF-k requires k completed solves to have k distinct history values.
# With 0 or 1 completed solves → order 1. Order 2 needs ≥2 solves.
return min(self.order, max(1, self._n_solves_completed))
[docs]
def initialise_history(self):
r"""Initialize all history slots to the current value of :math:`\psi`.
Called automatically on the first ``update_pre_solve``. Can also
be called manually after setting initial conditions.
"""
psi_star_0 = self.psi_star[0]
with self.swarm.access(psi_star_0):
for i in range(psi_star_0.shape[0]):
for j in range(psi_star_0.shape[1]):
updated_psi = uw.function.evaluate(
self.psi_fn[i, j],
self.swarm.data,
)
psi_star_0[i, j].data[:] = updated_psi
# Copy to all other history slots
for k in range(1, self.order):
with self.swarm.access(self.psi_star[k]):
self.psi_star[k].data[...] = psi_star_0.data[...]
self._history_initialised = True
return
[docs]
def initiate_history_fn(self):
"""Deprecated: use ``initialise_history`` instead."""
self.initialise_history()
## Note: We may be able to eliminate this
## The SL updater and the Lag updater have
## different sequencing because of the way they
## update the history. It makes more sense for the
## full Lagrangian swarm to be updated after the solve
## and this means we have to grab the history values first.
[docs]
def update(
self,
dt: float,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
):
"""Update history (alias for ``update_post_solve``)."""
self.update_post_solve(dt, evalf, verbose)
return
[docs]
def update_pre_solve(
self,
dt: float,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
):
"""Pre-solve: auto-initialise history on first call."""
self._dt = dt
if not self._history_initialised:
self.initialise_history()
# Update coefficient values for current effective_order and dt
_update_bdf_values(self._bdf_coeffs, self.effective_order, self._dt, self._dt_history)
_update_am_values(self._am_coeffs, self.effective_order, 0.5)
return
[docs]
def update_post_solve(
self,
dt: float,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
):
"""Shift history chain and advect swarm after solve."""
self._dt = dt
# Record timestep history for variable-dt BDF
for i in range(self.order - 1, 0, -1):
self._dt_history[i] = self._dt_history[i - 1]
self._dt_history[0] = dt
for h in range(self.order - 1):
i = self.order - (h + 1)
# copy the information down the chain
if verbose:
print(f"Lagrange order = {self.order}")
print(f"Lagrange copying {i-1} to {i}")
self.psi_star[i].array[...] = self.psi_star[i - 1].array[...]
# Now update the swarm variable
psi_star_0 = self.psi_star[0]
with self.swarm.access(psi_star_0):
for i in range(psi_star_0.shape[0]):
for j in range(psi_star_0.shape[1]):
updated_psi = uw.function.evaluate(
self.psi_fn[i, j],
self.swarm.data,
evalf=evalf,
)
psi_star_0[i, j].data[:] = updated_psi
# Now update the swarm locations
self.swarm.advection(
self.V_fn,
delta_t=dt,
restore_points_to_domain_func=self.mesh.return_coords_to_bounds,
)
if self._n_solves_completed < self.order:
self._n_solves_completed += 1
@property
def bdf_coefficients(self):
"""Current BDF coefficients [c0, c1, ...] accounting for variable timesteps."""
return _bdf_coefficients(self.effective_order, self._dt, self._dt_history)
[docs]
def bdf(self, order=None):
r"""Backward differentiation approximation of the time-derivative of :math:`\psi`.
Returns a fixed-structure symbolic expression using UWexpression
coefficients. Values are updated each step in ``update_pre_solve``.
Parameters
----------
order : int, optional
Ignored (kept for API compatibility).
"""
return _build_weighted_sum(self._bdf_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
[docs]
def adams_moulton_flux(self, order=None):
r"""Adams-Moulton flux approximation for implicit time integration.
Returns a fixed-structure symbolic expression using UWexpression
coefficients. Values are updated each step in ``update_pre_solve``.
Parameters
----------
order : int, optional
Ignored (kept for API compatibility).
"""
return _build_weighted_sum(self._am_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
[docs]
class Lagrangian_Swarm(uw_object):
r"""
Swarm-based Lagrangian history manager (user-provided swarm).
Manages the update of a Lagrangian variable :math:`\psi` on a user-supplied
swarm across timesteps:
.. math::
\psi_p^{t-n\Delta t} \leftarrow \psi_p^{t-(n-1)\Delta t}
\psi_p^{t-(n-1)\Delta t} \leftarrow \psi_p^{t-(n-2)\Delta t} \cdots
\psi_p^{t-\Delta t} \leftarrow \psi_p^{t}
Unlike ``Lagrangian``, this class uses a user-provided swarm rather than
creating its own. The swarm should already be populated and configured
for tracking material points.
Parameters
----------
swarm : underworld3.swarm.Swarm
User-provided swarm for material point tracking.
psi_fn : sympy.Function
The quantity to track (e.g., stress tensor from a solver).
vtype : underworld3.VarType
Variable type (SCALAR, VECTOR, SYM_TENSOR, etc.).
degree : int
Interpolation degree for proxy mesh variables.
continuous : bool
Whether proxy mesh variables should be continuous.
varsymbol : str, optional
LaTeX symbol for display (default ``"u"``).
verbose : bool, optional
Enable verbose output (default ``False``).
bcs : list, optional
Boundary conditions (currently unused, default ``[]``).
order : int, optional
Order of time integration (1-3) (default ``1``).
smoothing : float, optional
Smoothing parameter (default ``0.0``).
step_averaging : int, optional
Number of steps for history averaging (default ``2``).
Attributes
----------
mesh : underworld3.discretisation.Mesh
Reference to the computational mesh (from swarm).
swarm : underworld3.swarm.Swarm
The user-provided swarm.
psi_fn : sympy.Function
Symbolic expression for the tracked quantity.
order : int
Order of BDF integration.
step_averaging : int
Number of steps for averaging (affects BDF scaling).
psi_star : list
History values :math:`\psi^*, \psi^{**}, \ldots` as swarm variables.
Notes
-----
Key differences from ``Lagrangian`` class:
- Uses user-provided swarm (not internally created)
- Swarm advection is NOT performed (user's responsibility)
- Step averaging for smoothing history updates
- Suitable when swarm is shared between multiple history managers
The ``step_averaging`` parameter scales the BDF formula to account for
updates that occur over multiple sub-steps within a timestep.
See Also
--------
Lagrangian : Creates and manages its own swarm with advection.
SemiLagrangian : Nodal-swarm approach for advection-dominated problems.
Eulerian : Pure mesh-based history (no particle tracking).
"""
instances = (
0 # count how many of these there are in order to create unique private mesh variable ids
)
[docs]
@timing.routine_timer_decorator
def __init__(
self,
swarm: uw.swarm.Swarm,
psi_fn: sympy.Function,
vtype: uw.VarType,
degree: int,
continuous: bool,
varsymbol: Optional[str] = r"u",
verbose: Optional[bool] = False,
bcs=[],
order=1,
smoothing=0.0,
step_averaging=2,
):
super().__init__()
self.mesh = swarm.mesh
self.swarm = swarm
self.psi_fn = psi_fn
self.verbose = verbose
self.order = order
self.step_averaging = step_averaging
# History tracking: deferred initialization and effective order
self._history_initialised = False
self._n_solves_completed = 0
self._dt = None # current timestep (set by solver or update_pre_solve)
self._dt_history = [None] * order # previous timesteps for variable-dt BDF
psi_star = []
self.psi_star = psi_star
for i in range(order):
self.psi_star.append(
uw.swarm.SwarmVariable(
f"psi_star_sw_{self.instance_number}_{i}",
self.swarm,
vtype=vtype,
proxy_degree=degree,
proxy_continuous=continuous,
varsymbol=rf"{varsymbol}^{{ {'*'*(i+1)} }}",
)
)
# BDF/AM coefficient UWexpressions — routed through PetscDS constants[]
self._bdf_coeffs = _create_coefficients(order, r"c^{\mathrm{BDF}}", self.instance_number)
self._am_coeffs = _create_coefficients(order, r"a^{\mathrm{AM}}", self.instance_number)
# Initialise to order-1 values
_update_bdf_values(self._bdf_coeffs, 1, None, [])
_update_am_values(self._am_coeffs, 1, 0.5)
return
def _object_viewer(self):
from IPython.display import Latex, Markdown, display
super()._object_viewer()
## feedback on this instance
display(Latex(r"$\quad\psi = $ " + self.psi._repr_latex_()))
display(
Latex(
r"$\quad\Delta t_{\textrm{phys}} = $ "
+ sympy.sympify(self.dt_physical)._repr_latex_()
)
)
display(Latex(rf"$\quad$History steps = {self.order}"))
@property
def effective_order(self):
"""Current effective BDF order, accounting for history startup."""
# BDF-k requires k completed solves to have k distinct history values.
# With 0 or 1 completed solves → order 1. Order 2 needs ≥2 solves.
return min(self.order, max(1, self._n_solves_completed))
[docs]
def initialise_history(self):
r"""Initialize all history slots to the current value of :math:`\psi`.
Called automatically on the first ``update_pre_solve``. Can also
be called manually after setting initial conditions.
"""
psi_star_0 = self.psi_star[0]
with self.swarm.access(psi_star_0):
for i in range(psi_star_0.shape[0]):
for j in range(psi_star_0.shape[1]):
updated_psi = uw.function.evaluate(
self.psi_fn[i, j],
self.swarm.data,
)
psi_star_0[i, j].data[:] = updated_psi
# Copy to all other history slots
for k in range(1, self.order):
with self.swarm.access(self.psi_star[k]):
self.psi_star[k].data[...] = psi_star_0.data[...]
self._history_initialised = True
return
[docs]
def initiate_history_fn(self):
"""Deprecated: use ``initialise_history`` instead."""
self.initialise_history()
## Note: We may be able to eliminate this
## The SL updater and the Lag updater have
## different sequencing because of the way they
## update the history. It makes more sense for the
## full Lagrangian swarm to be updated after the solve
## and this means we have to grab the history values first.
[docs]
def update(
self,
dt: float,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
):
"""Update history (alias for ``update_post_solve``)."""
self.update_post_solve(dt, evalf, verbose)
return
[docs]
def update_pre_solve(
self,
dt: float,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
):
"""Pre-solve: auto-initialise history on first call."""
self._dt = dt
if not self._history_initialised:
self.initialise_history()
# Update coefficient values for current effective_order and dt
_update_bdf_values(self._bdf_coeffs, self.effective_order, self._dt, self._dt_history)
_update_am_values(self._am_coeffs, self.effective_order, 0.5)
return
[docs]
def update_post_solve(
self,
dt: float,
evalf: Optional[bool] = False,
verbose: Optional[bool] = False,
):
r"""Shift history chain and evaluate current :math:`\psi` on swarm."""
self._dt = dt
# Record timestep history for variable-dt BDF
for i in range(self.order - 1, 0, -1):
self._dt_history[i] = self._dt_history[i - 1]
self._dt_history[0] = dt
for h in range(self.order - 1):
i = self.order - (h + 1)
# copy the information down the chain
if verbose:
print(f"Lagrange swarm order = {self.order}", flush=True)
print(
f"Mesh interpolant order = {self.psi_star[0]._meshVar.degree}",
flush=True,
)
print(f"Lagrange swarm copying {i-1} to {i}", flush=True)
with self.swarm.access(self.psi_star[i]):
self.psi_star[i].data[...] = self.psi_star[i - 1].data[...]
phi = 1 / self.step_averaging
psi_star_0 = self.psi_star[0]
with self.swarm.access(psi_star_0):
for i in range(psi_star_0.shape[0]):
for j in range(psi_star_0.shape[1]):
updated_psi = uw.function.evaluate(
self.psi_fn[i, j],
self.swarm.data,
evalf=evalf,
)
psi_star_0[i, j].data[:] = (
phi * updated_psi + (1 - phi) * psi_star_0[i, j].data[:]
)
if self._n_solves_completed < self.order:
self._n_solves_completed += 1
return
@property
def bdf_coefficients(self):
"""Current BDF coefficients [c0, c1, ...] accounting for variable timesteps."""
return _bdf_coefficients(self.effective_order, self._dt, self._dt_history)
[docs]
def bdf(self, order=None):
r"""Backward differentiation approximation of the time-derivative of :math:`\psi`.
Returns a fixed-structure symbolic expression using UWexpression
coefficients. Values are updated each step in ``update_pre_solve``.
Parameters
----------
order : int, optional
Ignored (kept for API compatibility).
"""
return _build_weighted_sum(self._bdf_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
[docs]
def adams_moulton_flux(self, order=None):
r"""Adams-Moulton flux approximation for implicit time integration.
Returns a fixed-structure symbolic expression using UWexpression
coefficients. Values are updated each step in ``update_pre_solve``.
Parameters
----------
order : int, optional
Ignored (kept for API compatibility).
"""
return _build_weighted_sum(self._am_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])