Source code for underworld3.systems.ddt

r"""
Time derivative approximations for transient problems.

This module provides classes for computing time derivatives (D/Dt) using
various numerical schemes. These are used within solvers to discretize the
time dimension of PDEs.

All DDt classes share a common interface:

- ``update(dt, ...)`` — alias for ``update_pre_solve``
- ``update_pre_solve(dt, ...)`` — called before the PDE solve
- ``update_post_solve(dt, ...)`` — called after the PDE solve
- ``bdf(order)`` — backward differentiation formula (returns Δψ, divide by Δt for rate)
- ``adams_moulton_flux(order)`` — weighted flux for implicit integration

**Symbolic** -- Pure symbolic history, no mesh storage. Used for flux
tracking in SNES_Diffusion where the flux expression is a SymPy tree.

**Eulerian** -- Fixed-grid time derivative with optional grid-based
advection. When ``V_fn`` is provided, ``update_pre_solve`` applies an
explicit advection correction so that ``bdf()`` approximates the full
material derivative.

**SemiLagrangian** -- Characteristic-based D/Dt via departure points.
Traces backward along velocity field to sample upstream values.
Unconditionally stable for advection (no CFL constraint).

**Lagrangian** -- Full particle-following D/Dt. Creates and manages
its own swarm, advected during ``update_post_solve``.

**Lagrangian_Swarm** -- Specialized swarm-based Lagrangian using a
user-provided swarm. Swarm advection is the user's responsibility.

Notes
-----
The choice of time derivative scheme affects accuracy, stability, and
computational cost:

- **Eulerian**: Fixed grid, simple, best for weak-to-moderate advection
  or purely diffusive problems (e.g. Richards equation with no transport).
- **Eulerian (with V_fn)**: Adds explicit advection correction — subject
  to CFL but suitable when velocity is moderate.
- **Semi-Lagrangian**: Best for advection-dominated problems (high Péclet).
  Unconditionally stable but breaks down when v → 0.
- **Lagrangian**: Tracks material properties without numerical diffusion.
  Best for purely advected quantities (e.g. composition).

See Also
--------
underworld3.systems.solvers : PDE solvers using these time derivatives.
"""

import sympy
from sympy import sympify
import numpy as np

from typing import Optional, Callable, Union

import underworld3 as uw
from underworld3 import VarType

import underworld3.timing as timing
from underworld3.utilities._api_tools import uw_object

from petsc4py import PETSc


def _as_float(value):
    """Extract a plain float from various numeric types (Pint, UWQuantity, etc.)."""
    if value is None:
        return None
    if isinstance(value, (int, float)):
        return float(value)
    if hasattr(value, "magnitude"):
        return float(value.magnitude)
    if hasattr(value, "value"):
        return float(value.value)
    try:
        return float(value)
    except (TypeError, ValueError):
        return None


def _bdf_coefficients(order, dt_current, dt_history):
    r"""Compute BDF coefficients, handling variable timesteps.

    For uniform timesteps the classical BDF-k coefficients are returned
    as exact ``sympy.Rational`` values.  When the timestep ratio differs
    from unity the variable-step BDF-2 formula is used (BDF-3 falls back
    to variable-step BDF-2 when the ratio exceeds 5 %).

    Parameters
    ----------
    order : int
        BDF order (1, 2, or 3).
    dt_current : float or None
        Current timestep :math:`\Delta t_n`.
    dt_history : list
        Previous timesteps ``[dt_{n-1}, dt_{n-2}, ...]``.

    Returns
    -------
    list of sympy.Basic
        Coefficients ``[c0, c1, c2, ...]`` such that

        .. math::

            \Delta\psi_{\mathrm{BDF}} = c_0\,\psi^{n+1}
                + c_1\,\psi^n + c_2\,\psi^{n-1} + \cdots

    Notes
    -----
    **BDF-1** (backward Euler) always returns ``[1, -1]``.

    **BDF-2** with variable step uses
    :math:`r = \Delta t_n / \Delta t_{n-1}`:

    .. math::

        c_0 = \frac{1+2r}{1+r}, \quad
        c_1 = -(1+r), \quad
        c_2 = \frac{r^2}{1+r}

    which reduces to :math:`[3/2,\;-2,\;1/2]` when :math:`r=1`.

    **BDF-3** uses the constant-step formula when all ratios are within
    5 % of unity; otherwise it falls back to variable-step BDF-2.
    """
    if order <= 1:
        return [sympy.Integer(1), sympy.Integer(-1)]

    dt_n = _as_float(dt_current)
    dt_nm1 = (
        _as_float(dt_history[0])
        if len(dt_history) > 0 and dt_history[0] is not None
        else None
    )

    if order == 2:
        if dt_n is not None and dt_nm1 is not None and dt_nm1 > 0:
            r = dt_n / dt_nm1
            if abs(r - 1.0) < 1e-12:
                # Exactly constant dt — use exact rationals
                return [
                    sympy.Rational(3, 2),
                    sympy.Integer(-2),
                    sympy.Rational(1, 2),
                ]
            else:
                # Variable dt
                return [
                    sympy.sympify((1 + 2 * r) / (1 + r)),
                    sympy.sympify(-(1 + r)),
                    sympy.sympify(r**2 / (1 + r)),
                ]
        else:
            # No usable history — constant-dt fallback
            return [
                sympy.Rational(3, 2),
                sympy.Integer(-2),
                sympy.Rational(1, 2),
            ]

    # order >= 3
    dt_nm2 = (
        _as_float(dt_history[1])
        if len(dt_history) > 1 and dt_history[1] is not None
        else None
    )
    if (
        dt_n is not None
        and dt_nm1 is not None
        and dt_nm2 is not None
        and dt_nm1 > 0
        and dt_nm2 > 0
    ):
        r1 = dt_n / dt_nm1
        r2 = dt_nm1 / dt_nm2
        if abs(r1 - 1.0) < 0.05 and abs(r2 - 1.0) < 0.05:
            # Approximately constant dt — standard BDF-3
            return [
                sympy.Rational(11, 6),
                sympy.Integer(-3),
                sympy.Rational(3, 2),
                sympy.Rational(-1, 3),
            ]
        else:
            # Variable dt — fall back to BDF-2 with variable coefficients
            return _bdf_coefficients(2, dt_current, dt_history)
    else:
        # Insufficient history — constant-dt BDF-3
        return [
            sympy.Rational(11, 6),
            sympy.Integer(-3),
            sympy.Rational(3, 2),
            sympy.Rational(-1, 3),
        ]


# ============================================================================
# BDF/AM Coefficient Expressions
# ============================================================================
#
# These helpers create UWexpression coefficient objects and build fixed-structure
# symbolic expressions for bdf() and adams_moulton_flux(). The coefficients are
# routed through PETSc's constants[] array by the JIT compiler, so changing
# effective_order or variable dt only requires PetscDSSetConstants() — no
# recompilation.
# ============================================================================

from underworld3.function.expressions import UWexpression as _UWexpression


def _create_coefficients(order, prefix, instance_id):
    """Create UWexpression objects for BDF or AM coefficients.

    Parameters
    ----------
    order : int
        Maximum order (number of history terms). Creates order+1 coefficients.
    prefix : str
        LaTeX prefix for display (e.g. "c^{BDF}" or "a^{AM}").
    instance_id : int
        Unique ID to disambiguate coefficients from different DDt instances.

    Returns
    -------
    list of UWexpression
        Coefficient expressions initialised to 0.0.
    """
    coeffs = []
    for i in range(order + 1):
        c = _UWexpression(
            rf"{prefix}_{{{i},{instance_id}}}",
            sym=0.0,
            description=f"{prefix} coefficient {i} (DDt instance {instance_id})",
            _unique_name_generation=True,
        )
        coeffs.append(c)
    return coeffs


def _update_bdf_values(coeffs, effective_order, dt, dt_history):
    """Update BDF coefficient UWexpression values for current state.

    Sets active coefficients from _bdf_coefficients() and zeroes the rest.
    """
    values = _bdf_coefficients(effective_order, dt, dt_history)
    for i, v in enumerate(values):
        coeffs[i].sym = float(v)
    for i in range(len(values), len(coeffs)):
        coeffs[i].sym = 0.0


def _update_am_values(coeffs, effective_order, theta=0.5):
    """Update Adams-Moulton coefficient UWexpression values for current state.

    AM coefficients for each order (constant-dt formulas):
    - Order 0: [1]
    - Order 1: [theta, 1-theta]
    - Order 2: [5/12, 8/12, -1/12]
    - Order 3: [9/24, 19/24, -5/24, 1/24]
    """
    if effective_order <= 0:
        values = [1.0]
    elif effective_order == 1:
        values = [float(theta), 1.0 - float(theta)]
    elif effective_order == 2:
        values = [5.0 / 12, 8.0 / 12, -1.0 / 12]
    elif effective_order >= 3:
        values = [9.0 / 24, 19.0 / 24, -5.0 / 24, 1.0 / 24]

    for i, v in enumerate(values):
        coeffs[i].sym = v
    for i in range(len(values), len(coeffs)):
        coeffs[i].sym = 0.0


def _build_weighted_sum(coeffs, psi_fn, psi_star_syms):
    """Build a fixed-structure weighted sum: c0*psi + c1*psi_star[0] + ...

    The symbolic structure includes all terms up to len(coeffs)-1.
    Inactive terms have coefficient=0 and vanish numerically.

    Parameters
    ----------
    coeffs : list of UWexpression
        Coefficient expressions (length = order + 1).
    psi_fn : sympy expression
        Current-time field expression.
    psi_star_syms : list of sympy expressions
        History term symbolic expressions (psi_star[i].sym or psi_star[i]).

    Returns
    -------
    sympy expression
        The weighted sum.
    """
    result = coeffs[0] * psi_fn
    for i in range(len(psi_star_syms)):
        if i + 1 < len(coeffs):
            result = result + coeffs[i + 1] * psi_star_syms[i]
    return result


[docs] class Symbolic(uw_object): r""" Symbolic history manager for time derivative approximations. Manages the update of a variable :math:`\psi` across timesteps. The history operator stores :math:`\psi` over several timesteps (given by ``order``) so that it can compute backward differentiation (BDF) or Adams-Moulton expressions. The history operator is defined as: .. math:: \psi_p^{t-n\Delta t} &\leftarrow \psi_p^{t-(n-1)\Delta t} \\ \psi_p^{t-(n-1)\Delta t} &\leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \\ \psi_p^{t-\Delta t} &\leftarrow \psi_p^{t} This is a purely symbolic history manager that operates on sympy expressions without mesh or swarm storage. It is useful for tracking symbolic expressions through time-stepping algorithms. Parameters ---------- psi_fn : sympy.Basic The sympy expression to track. Can be scalar or matrix form. theta : float, optional Implicitness parameter for Adams-Moulton order 1 (default ``0.5``). Values: 0 = explicit, 1 = implicit, 0.5 = Crank-Nicolson. varsymbol : str, optional LaTeX symbol for display (default ``r"\\psi"``). verbose : bool, optional Enable verbose output (default ``False``). bcs : list, optional Boundary conditions (default ``[]``). order : int, optional Order of time integration (1-3) (default ``1``). smoothing : float, optional Smoothing parameter (default ``0.0``). Notes ----- The ``Symbolic`` class is the base for understanding BDF and Adams-Moulton formulas without the complexity of mesh or swarm storage. It is primarily useful for: - Understanding time-stepping algorithm behavior - Debugging symbolic expressions in time-dependent problems - Prototyping before implementing with mesh/swarm storage For actual simulations, use ``Eulerian``, ``SemiLagrangian``, or ``Lagrangian`` which store history on computational meshes or swarms. See Also -------- Eulerian : Mesh-based history with BDF time-stepping. SemiLagrangian : Nodal-swarm approach for advection-dominated problems. Lagrangian : Swarm-based material tracking. """
[docs] @timing.routine_timer_decorator def __init__( self, psi_fn: sympy.Basic, # a sympy expression for ψ; can be scalar or matrix theta: Optional[float] = 0.5, varsymbol: Optional[str] = r"\psi", verbose: Optional[bool] = False, bcs=[], order: int = 1, smoothing: float = 0.0, ): super().__init__() self.theta = theta self.bcs = bcs self.verbose = verbose self.smoothing = smoothing self.order = order # History tracking: deferred initialization and effective order self._history_initialised = False self._n_solves_completed = 0 self._dt = None # current timestep (set by solver or update_pre_solve) self._dt_history = [None] * order # previous timesteps for variable-dt BDF # Ensure psi_fn is a sympy Matrix. if not isinstance(psi_fn, sympy.Matrix): try: psi_fn = sympy.Matrix(psi_fn) except Exception: psi_fn = sympy.Matrix([[psi_fn]]) self._psi_fn = psi_fn # stored with its native shape self._shape = psi_fn.shape # capture the shape # Set the display symbol for psi_fn and for the history variable. self._psi_fn_symbol = varsymbol # e.g. "\psi" self._psi_star_symbol = varsymbol + r"^\ast" # e.g. "\psi^\ast" # Create the history list: each element is a Matrix of shape _shape. self.psi_star = [sympy.zeros(*self._shape) for _ in range(order)] # BDF/AM coefficient UWexpressions — routed through PetscDS constants[] self._bdf_coeffs = _create_coefficients(order, r"c^{\mathrm{BDF}}", self.instance_number) self._am_coeffs = _create_coefficients(order, r"a^{\mathrm{AM}}", self.instance_number) # Initialise to order-1 values _update_bdf_values(self._bdf_coeffs, 1, None, []) _update_am_values(self._am_coeffs, 1, self.theta) return
@property def psi_fn(self): r"""Current symbolic expression :math:`\psi` being tracked.""" return self._psi_fn @psi_fn.setter def psi_fn(self, new_fn): """Set the tracked symbolic expression.""" if not isinstance(new_fn, sympy.Matrix): try: new_fn = sympy.Matrix(new_fn) except Exception: new_fn = sympy.Matrix([[new_fn]]) # Optionally, one could check for matching shape; here we update both. self._psi_fn = new_fn self._shape = new_fn.shape return def _object_viewer(self): from IPython.display import Latex, display # Display the primary variable display(Latex(rf"$\quad {self._psi_fn_symbol} = {sympy.latex(self._psi_fn)}$")) # Display the history variable using the different symbol. history_latex = ", ".join([sympy.latex(elem) for elem in self.psi_star]) display(Latex(rf"$\quad {self._psi_star_symbol} = \left[{history_latex}\right]$")) @property def effective_order(self): """Current effective BDF order, accounting for history startup. For BDF order k, k distinct history values are needed. During startup, ``effective_order`` ramps from 1 to ``self.order`` as successive solves populate the history slots with distinct values. """ # BDF-k requires k completed solves to have k distinct history values. # With 0 or 1 completed solves → order 1. Order 2 needs ≥2 solves. return min(self.order, max(1, self._n_solves_completed))
[docs] def update_history_fn(self): r"""Copy current :math:`\psi` to the first history slot ``psi_star[0]``.""" # Update the first history element with a copy of the current ψ. self.psi_star[0] = self.psi_fn.copy()
[docs] def initialise_history(self): r"""Initialize all history slots to the current value of :math:`\psi`. Called automatically on the first ``update_pre_solve``. Can also be called manually after setting initial conditions. """ self.update_history_fn() # Propagate the initial history to all history steps. for i in range(1, self.order): self.psi_star[i] = self.psi_star[0].copy() self._history_initialised = True return
[docs] def initiate_history_fn(self): """Deprecated: use ``initialise_history`` instead.""" self.initialise_history()
[docs] def update( self, dt, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Update history (alias for ``update_pre_solve``).""" self.update_pre_solve(dt, evalf, verbose) return
[docs] def update_pre_solve( self, dt, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Pre-solve update hook. Auto-initialises history on first call.""" self._dt = dt if not self._history_initialised: self.initialise_history() # Update coefficient values for current effective_order and dt _update_bdf_values(self._bdf_coeffs, self.effective_order, self._dt, self._dt_history) _update_am_values(self._am_coeffs, self.effective_order, self.theta) return
[docs] def update_post_solve( self, dt, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): r"""Shift history chain after solve: :math:`\psi^{*n} \leftarrow \psi^{*(n-1)}`.""" self._dt = dt if verbose: print(f"Updating history for ψ = {self.psi_fn}", flush=True) # Record timestep history for variable-dt BDF for i in range(self.order - 1, 0, -1): self._dt_history[i] = self._dt_history[i - 1] self._dt_history[0] = dt # Shift history: copy each element down the chain. for i in range(self.order - 1, 0, -1): self.psi_star[i] = self.psi_star[i - 1].copy() self.update_history_fn() if self._n_solves_completed < self.order: self._n_solves_completed += 1 return
@property def bdf_coefficients(self): """Current BDF coefficients [c0, c1, ...] accounting for variable timesteps.""" return _bdf_coefficients(self.effective_order, self._dt, self._dt_history)
[docs] def bdf(self, order: Optional[int] = None): r"""Backward differentiation approximation of the time-derivative of ψ. Returns a fixed-structure symbolic expression using UWexpression coefficients. The coefficient values are updated each step in ``update_pre_solve`` — no JIT recompilation needed when the order ramps up or the timestep changes. Parameters ---------- order : int, optional Ignored (kept for API compatibility). The effective order is controlled by the coefficient values. """ return _build_weighted_sum(self._bdf_coeffs, self.psi_fn, self.psi_star)
[docs] def adams_moulton_flux(self, order: Optional[int] = None): r"""Adams-Moulton flux approximation for implicit time integration. Returns a fixed-structure symbolic expression using UWexpression coefficients. Values are updated each step in ``update_pre_solve``. Parameters ---------- order : int, optional Ignored (kept for API compatibility). """ return _build_weighted_sum(self._am_coeffs, self.psi_fn, self.psi_star)
[docs] class Eulerian(uw_object): r""" Eulerian (mesh-based) history manager for time derivatives. Manages the update of a variable :math:`\psi` on the mesh across timesteps, storing history values on mesh variables for backward differentiation. .. math:: \psi_p^{t-n\Delta t} &\leftarrow \psi_p^{t-(n-1)\Delta t} \\ \psi_p^{t-(n-1)\Delta t} &\leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \\ \psi_p^{t-\Delta t} &\leftarrow \psi_p^{t} When ``V_fn`` is provided, the ``update_pre_solve`` method applies an explicit advection correction so that ``bdf()`` approximates the full material derivative :math:`D\psi/Dt = \partial\psi/\partial t + \mathbf{u}\cdot\nabla\psi` rather than the partial time derivative alone. .. note:: The optional advection capability (V_fn parameter) is suitable for problems where the advection is weak or where a purely grid-based approach is desired (e.g., Richards equation with no transport). For advection-dominated problems, SemiLagrangian is more mature and generally preferred. Parameters ---------- mesh : underworld3.discretisation.Mesh The computational mesh. psi_fn : MeshVariable or sympy.Basic The quantity to track. Can be a mesh variable or symbolic expression. vtype : VarType Variable type (SCALAR, VECTOR, etc.) for history storage. degree : int Polynomial degree for history mesh variables. continuous : bool Whether history variables are continuous across element boundaries. V_fn : sympy.Basic, optional Velocity field for grid-based advection correction. If None (default), computes pure ∂ψ/∂t. If set, computes D/Dt = ∂/∂t + u·∇ via operator splitting. evalf : bool, default=False If True, evaluate expressions numerically during updates. theta : float, default=0.5 Time-stepping parameter for implicit/explicit blending. theta=0 is fully explicit, theta=1 is fully implicit. varsymbol : str, default=r"u" LaTeX symbol for display. verbose : bool, default=False Enable verbose output during updates. bcs : list, default=[] Boundary conditions to apply to projections. order : int, default=1 Number of history timesteps to store (for multi-step methods). smoothing : float, default=0.0 Smoothing parameter for projections. See Also -------- SemiLagrangian : For advection-dominated problems with nodal swarm. Lagrangian : For full Lagrangian tracking on swarms. Symbolic : For purely symbolic history (no mesh storage). """
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, psi_fn: Union[ uw.discretisation.MeshVariable, sympy.Basic ], # sympy function or mesh variable vtype: uw.VarType, degree: int, continuous: bool, V_fn=None, evalf: Optional[bool] = False, theta: Optional[float] = 0.5, varsymbol: Optional[str] = r"u", verbose: Optional[bool] = False, bcs=[], order=1, smoothing=0.0, ): super().__init__() self.mesh = mesh self.V_fn = V_fn self.theta = theta self.bcs = bcs self.verbose = verbose self.degree = degree self.vtype = vtype self.continuous = continuous self.smoothing = smoothing self.evalf = evalf # History tracking: deferred initialization and effective order self._history_initialised = False self._n_solves_completed = 0 # meshVariables are required for: # # u(t) - evaluation of u_fn at the current time # u*(t) - u_* evaluated from # psi is evaluated/stored at `order` timesteps. We can't # be sure if psi is a meshVariable or a function to be evaluated # psi_star is reaching back through each evaluation and has to be a # meshVariable (storage) if isinstance(psi_fn, uw.discretisation.MeshVariable): self._psi_fn = psi_fn.sym ### get symbolic form of the meshvariable self._psi_meshVar = psi_fn else: self._psi_fn = psi_fn ### already in symbolic form self._psi_meshVar = None self.order = order self._dt = None # current timestep (set by solver or update_pre_solve) self._dt_history = [None] * order # previous timesteps for variable-dt BDF psi_star = [] self.psi_star = psi_star for i in range(order): self.psi_star.append( uw.discretisation.MeshVariable( f"psi_star_Eulerian_{self.instance_number}_{i}", self.mesh, vtype=vtype, degree=degree, continuous=continuous, varsymbol=rf"{varsymbol}^{{ {'*'*(i+1)} }}", ) ) # BDF/AM coefficient UWexpressions — routed through PetscDS constants[] self._bdf_coeffs = _create_coefficients(order, r"c^{\mathrm{BDF}}", self.instance_number) self._am_coeffs = _create_coefficients(order, r"a^{\mathrm{AM}}", self.instance_number) # Initialise to order-1 values _update_bdf_values(self._bdf_coeffs, 1, None, []) _update_am_values(self._am_coeffs, 1, self.theta) return
@property def psi_fn(self): r"""Current symbolic expression :math:`\psi` being tracked.""" return self._psi_fn @psi_fn.setter def psi_fn(self, new_fn): """Set the tracked expression.""" self._psi_fn = new_fn # self._psi_star_projection_solver.uw_function = self.psi_fn return def _object_viewer(self): from IPython.display import Latex, Markdown, display super()._object_viewer() ## feedback on this instance # display(Latex(r"$\quad\psi = $ " + self.psi._repr_latex_())) # display( # Latex( # r"$\quad\Delta t_{\textrm{phys}} = $ " # + sympy.sympify(self.dt_physical)._repr_latex_() # ) # ) display(Latex(rf"$\quad$History steps = {self.order}")) def _setup_projections(self): """Initialize projection solvers for history updates.""" ### using this to store terms that can't be evaluated (e.g. derivatives) # The projection operator for mapping derivative values to the mesh - needs to be different for each variable type, unfortunately ... if self.vtype == uw.VarType.SCALAR: self._psi_star_projection_solver = uw.systems.solvers.SNES_Projection( self.mesh, self.psi_star[0], verbose=False ) elif self.vtype == uw.VarType.VECTOR: self._psi_star_projection_solver = uw.systems.solvers.SNES_Vector_Projection( self.mesh, self.psi_star[0], verbose=False ) elif self.vtype == uw.VarType.SYM_TENSOR or self.vtype == uw.VarType.TENSOR: import math dim = self.mesh.dim if self.vtype == uw.VarType.SYM_TENSOR: Nc = math.comb(dim + 1, 2) # 3 in 2D, 6 in 3D self._psi_star_indep_indices = [ (i, j) for i in range(dim) for j in range(i, dim) ] else: Nc = dim * dim self._psi_star_indep_indices = [ (i, j) for i in range(dim) for j in range(dim) ] self._psi_star_flat_var = uw.discretisation.MeshVariable( f"psi_star_flat_{self.instance_number}", self.mesh, (1, Nc), vtype=uw.VarType.MATRIX, degree=self.degree, continuous=self.continuous, varsymbol=r"{\psi^{*}_{\mathrm{flat}}}", ) self._psi_star_projection_solver = uw.systems.solvers.SNES_MultiComponent_Projection( self.mesh, u_Field=self._psi_star_flat_var, n_components=Nc, degree=self.degree, verbose=False, ) self._psi_star_use_multicomponent = True if getattr(self, '_psi_star_use_multicomponent', False): # Flatten tensor to (1, Nc) row for multicomponent solver import sympy indep = self._psi_star_indep_indices row = sympy.Matrix([[self.psi_fn[i, j] for (i, j) in indep]]) self._psi_star_projection_solver.uw_function = row else: self._psi_star_projection_solver.uw_function = self.psi_fn self._psi_star_projection_solver.bcs = self.bcs self._psi_star_projection_solver.smoothing = self.smoothing @property def effective_order(self): """Current effective BDF order, accounting for history startup. For BDF order k, k distinct history values are needed. During startup, ``effective_order`` ramps from 1 to ``self.order`` as successive solves populate the history slots with distinct values. """ # BDF-k requires k completed solves to have k distinct history values. # With 0 or 1 completed solves → order 1. Order 2 needs ≥2 solves. return min(self.order, max(1, self._n_solves_completed))
[docs] def update_history_fn(self): r"""Copy current :math:`\psi` to ``psi_star[0]`` via evaluation or projection.""" ### update first value in history chain ### avoids projecting if function can be evaluated try: try: self.psi_star[0].data[...] = self._psi_meshVar.data[...] except: self.psi_star[0].data[...] = uw.function.evaluate( self.psi_fn, self.psi_star[0].coords, evalf=self.evalf, ).reshape(-1, max(self.psi_fn.shape)) except: self._setup_projections() self._psi_star_projection_solver.solve()
[docs] def initialise_history(self): r"""Initialize all history slots to the current value of :math:`\psi`. Called automatically on the first ``update_pre_solve``. Can also be called manually after setting initial conditions to ensure the history chain starts from the correct state. """ self.update_history_fn() ### set up all history terms to the initial values for i in range(self.order - 1, 0, -1): self.psi_star[i].data[...] = self.psi_star[0].data[...] self._history_initialised = True return
[docs] def initiate_history_fn(self): """Deprecated: use ``initialise_history`` instead.""" self.initialise_history()
[docs] def update( self, dt, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Update history (alias for ``update_pre_solve``).""" self.update_pre_solve(dt, evalf, verbose) return
[docs] def update_pre_solve( self, dt, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Pre-solve: auto-initialise history and apply advection correction. On the first call, automatically initialises history from the current field values. If V_fn is set, also applies an explicit grid-based advection correction so that bdf() approximates the material derivative Dφ/Dt rather than ∂φ/∂t. """ self._dt = dt if not self._history_initialised: self.initialise_history() # Update coefficient values for current effective_order and dt _update_bdf_values(self._bdf_coeffs, self.effective_order, self._dt, self._dt_history) _update_am_values(self._am_coeffs, self.effective_order, self.theta) if self.V_fn is not None and dt is not None: coords = self.psi_star[0].coords dim = self.mesh.dim X = self.mesh.X # Build u·∇φ symbolically for each component of psi_fn # psi_fn is a Matrix; V_fn is also a Matrix. For scalar # psi_fn the shape is (1,1); for vector it is (1,dim). psi = self.psi_fn V = self.V_fn ncomp = max(psi.shape) # number of tracked components for c in range(ncomp): # ∂φ_c/∂x_i for each spatial dimension grad_c = sympy.Matrix([psi[c].diff(X[i]) for i in range(dim)]) # u·∇φ_c = V_i * ∂φ_c/∂x_i advection_expr = sum(V[i] * grad_c[i] for i in range(dim)) advection_vals = uw.function.evaluate( advection_expr, coords, evalf=evalf, ).reshape(-1) self.psi_star[0].data[:, c] -= dt * advection_vals return
[docs] def update_post_solve( self, dt, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): r"""Shift history chain after solve: :math:`\psi^{*n} \leftarrow \psi^{*(n-1)}`.""" self._dt = dt if verbose and uw.mpi.rank == 0: print(f"Update {self.psi_fn}", flush=True) # Record timestep history for variable-dt BDF for i in range(self.order - 1, 0, -1): self._dt_history[i] = self._dt_history[i - 1] self._dt_history[0] = dt ### copy values down the chain for i in range(self.order - 1, 0, -1): self.psi_star[i].data[...] = self.psi_star[i - 1].data[...] ### update the history fn self.update_history_fn() if self._n_solves_completed < self.order: self._n_solves_completed += 1 return
@property def bdf_coefficients(self): """Current BDF coefficients [c0, c1, ...] accounting for variable timesteps.""" return _bdf_coefficients(self.effective_order, self._dt, self._dt_history)
[docs] def bdf(self, order=None): r"""Backward differentiation approximation of the time-derivative of :math:`\psi`. Returns a fixed-structure symbolic expression using UWexpression coefficients. Values are updated each step in ``update_pre_solve``. Parameters ---------- order : int, optional Ignored (kept for API compatibility). """ return _build_weighted_sum(self._bdf_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
[docs] def adams_moulton_flux(self, order=None): r"""Adams-Moulton flux approximation for implicit time integration. Returns a fixed-structure symbolic expression using UWexpression coefficients. Values are updated each step in ``update_pre_solve``. Parameters ---------- order : int, optional Ignored (kept for API compatibility). """ return _build_weighted_sum(self._am_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
[docs] class SemiLagrangian(uw_object): r""" Semi-Lagrangian history manager using nodal swarm. Manages the semi-Lagrangian update of a mesh variable :math:`\psi` across timesteps. Uses a nodal swarm to track departure points and interpolate values back to the mesh. .. math:: \psi_p^{t-n\Delta t} &\leftarrow \psi_p^{t-(n-1)\Delta t} \\ \psi_p^{t-(n-1)\Delta t} &\leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \\ \psi_p^{t-\Delta t} &\leftarrow \psi_p^{t} The semi-Lagrangian method traces characteristics backward in time to find departure points, providing stable advection without CFL restrictions while maintaining accuracy. Parameters ---------- mesh : underworld3.discretisation.Mesh The computational mesh. psi_fn : sympy.Function The quantity to advect (typically a mesh variable's symbolic form). V_fn : sympy.Function Velocity field for advection (e.g., ``stokes.u.sym``). vtype : VarType Variable type (SCALAR, VECTOR, SYM_TENSOR, etc.). degree : int Polynomial degree for mesh variable storage. continuous : bool Whether variables are continuous across element boundaries. swarm_degree : int, optional Polynomial degree for swarm interpolation. Defaults to ``degree``. swarm_continuous : bool, optional Continuity for swarm variables. Defaults to ``continuous``. varsymbol : str, optional LaTeX symbol for display. verbose : bool, default=False Enable verbose output during updates. bcs : list, default=[] Boundary conditions for projections. order : int, default=1 Number of history timesteps (1 for first-order, 2 for second-order). smoothing : float, default=0.0 Smoothing parameter for projections. preserve_moments : bool, default=False Use moment-preserving projection (experimental). Notes ----- The semi-Lagrangian method is particularly useful for: - Advection-dominated problems (high Péclet number) - Problems where CFL stability is restrictive - Viscoelastic stress advection See Also -------- Eulerian : For fixed-mesh time derivatives without advection. Lagrangian : For full particle-following Lagrangian tracking. """
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, psi_fn: sympy.Function, V_fn: sympy.Function, vtype: uw.VarType, degree: int, continuous: bool, swarm_degree: Optional[int] = None, swarm_continuous: Optional[bool] = None, varsymbol: Optional[str] = None, verbose: Optional[bool] = False, bcs=[], order=1, smoothing=0.0, preserve_moments=False, ): super().__init__() self.mesh = mesh self.bcs = bcs self.verbose = verbose self.degree = degree self.continuous = continuous self._psi_fn = psi_fn self.V_fn = V_fn self.order = order self.preserve_moments = preserve_moments # History tracking: deferred initialization and effective order self._history_initialised = False self._n_solves_completed = 0 self._dt = None # current timestep (set by solver or update_pre_solve) self._dt_history = [None] * order # previous timesteps for variable-dt BDF if swarm_degree is None: self.swarm_degree = degree else: self.swarm_degree = swarm_degree if swarm_continuous is None: self.swarm_continuous = continuous else: self.swarm_continuous = swarm_continuous if varsymbol is None: varsymbol = rf"u_{{ [{self.instance_number}] }}" # meshVariables are required for: # # u(t) - evaluation of u_fn at the current time # u*(t) - u_* evaluated from # psi is evaluated/stored at `order` timesteps. We can't # be sure if psi is a meshVariable or a function to be evaluated # but psi_star is reaching back through each evaluation and has to be a # meshVariable (storage) psi_star = [] self.psi_star = psi_star # Propagate units from psi_fn to psi_star if the model supports units. # Internal psi_star variables should match the user's variable units when possible, # but if no reference quantities are set, use unitless variables to avoid strict mode errors. psi_units = uw.get_units(psi_fn) # Check if the model can handle units (has reference quantities set) model = uw.get_default_model() if psi_units is not None and not model.has_units(): # Model doesn't have reference quantities - don't propagate units to internal vars psi_units = None for i in range(order): self.psi_star.append( uw.discretisation.MeshVariable( f"psi_star_sl_{self.instance_number}_{i}", self.mesh, vtype=vtype, degree=self.degree, continuous=self.continuous, varsymbol=rf"{{ {varsymbol}^{{ {'*'*(i+1)} }} }}", units=psi_units, # Inherit units from psi_fn (or None if model has no units) ) ) # BDF/AM coefficient UWexpressions — routed through PetscDS constants[] self._bdf_coeffs = _create_coefficients(order, r"c^{\mathrm{BDF}}", self.instance_number) self._am_coeffs = _create_coefficients(order, r"a^{\mathrm{AM}}", self.instance_number) # Initialise to order-1 values _update_bdf_values(self._bdf_coeffs, 1, None, []) _update_am_values(self._am_coeffs, 1, 0.5) # Working variable that has a potentially different discretisation from psi_star # We project from this to psi_star and we use this variable to define the # advection sample points self._workVar = uw.discretisation.MeshVariable( f"W_{self.instance_number}_{i}", self.mesh, vtype=vtype, degree=self.swarm_degree, continuous=self.swarm_continuous, varsymbol=rf"{{ {varsymbol}^\nabla }}", units=psi_units, # Inherit units from psi_fn ) # We just need one swarm since this is inherently a sequential operation nswarm = uw.swarm.NodalPointSwarm(self._workVar, verbose) self._nswarm_psi = nswarm # The projection operator for mapping swarm values to the mesh - needs to be different for # each variable type, unfortunately ... if vtype == uw.VarType.SCALAR: self._psi_star_projection_solver = uw.systems.solvers.SNES_Projection( self.mesh, self.psi_star[0], verbose=False ) elif vtype == uw.VarType.VECTOR: self._psi_star_projection_solver = uw.systems.solvers.SNES_Vector_Projection( self.mesh, self.psi_star[0], verbose=False, ) elif vtype == uw.VarType.SYM_TENSOR or vtype == uw.VarType.TENSOR: import math dim = self.mesh.dim if vtype == uw.VarType.SYM_TENSOR: Nc = math.comb(dim + 1, 2) self._psi_star_indep_indices = [ (i, j) for i in range(dim) for j in range(i, dim) ] else: Nc = dim * dim self._psi_star_indep_indices = [ (i, j) for i in range(dim) for j in range(dim) ] self._psi_star_flat_var = uw.discretisation.MeshVariable( f"psi_star_flat_slcn_{self.instance_number}", self.mesh, (1, Nc), vtype=uw.VarType.MATRIX, degree=degree, continuous=continuous, varsymbol=r"{\psi^{*}_{\mathrm{flat}}}", ) self._psi_star_projection_solver = uw.systems.solvers.SNES_MultiComponent_Projection( self.mesh, u_Field=self._psi_star_flat_var, n_components=Nc, degree=degree, verbose=False, ) self._psi_star_use_multicomponent = True # We should find a way to add natural bcs here # (self.Unknowns.u carried as a symbol from solver to solver) if getattr(self, '_psi_star_use_multicomponent', False): import sympy indep = self._psi_star_indep_indices fn = self._workVar.sym row = sympy.Matrix([[fn[i, j] for (i, j) in indep]]) self._psi_star_projection_solver.uw_function = row else: self._psi_star_projection_solver.uw_function = self._workVar.sym self._psi_star_projection_solver.bcs = bcs self._psi_star_projection_solver.smoothing = smoothing self._smoothing = smoothing self.I = uw.maths.Integral(mesh, None) return
@property def psi_fn(self): r"""Current symbolic expression :math:`\psi` being tracked.""" return self._psi_fn @psi_fn.setter def psi_fn(self, new_fn): """Set the tracked expression.""" self._psi_fn = new_fn if getattr(self, '_psi_star_use_multicomponent', False): import sympy indep = self._psi_star_indep_indices row = sympy.Matrix([[new_fn[i, j] for (i, j) in indep]]) self._psi_star_projection_solver.uw_function = row else: self._psi_star_projection_solver.uw_function = self._psi_fn return def _object_viewer(self): from IPython.display import Latex, Markdown, display super()._object_viewer() display(Latex(rf"$\quad$History steps = {self.order}")) @property def effective_order(self): """Current effective BDF order, accounting for history startup. For BDF order k, k distinct history values are needed. During startup, ``effective_order`` ramps from 1 to ``self.order`` as successive solves populate the history slots with distinct values. """ # BDF-k requires k completed solves to have k distinct history values. # With 0 or 1 completed solves → order 1. Order 2 needs ≥2 solves. return min(self.order, max(1, self._n_solves_completed))
[docs] def initialise_history(self): r"""Initialize all history slots to the current value of :math:`\psi`. Called automatically on the first ``update_pre_solve``. Can also be called manually after setting initial conditions. """ # Evaluate psi_fn at psi_star node positions and store in psi_star[0] from underworld3.utilities.unit_aware_array import UnitAwareArray coords = self.psi_star[0].coords if hasattr(coords, "magnitude"): coords_nd = uw.non_dimensionalise(coords) if isinstance(coords_nd, UnitAwareArray): coords_nd = np.array(coords_nd) elif hasattr(coords_nd, 'magnitude'): coords_nd = coords_nd.magnitude else: coords_nd = coords try: eval_result = uw.function.evaluate(self.psi_fn, coords_nd) psi_units = self.psi_star[0].units if psi_units is not None and not isinstance(eval_result, UnitAwareArray): eval_result = UnitAwareArray(eval_result, units=psi_units) self.psi_star[0].array[...] = eval_result except Exception: if getattr(self, '_psi_star_use_multicomponent', False): import sympy indep = self._psi_star_indep_indices row = sympy.Matrix([[self.psi_fn[i, j] for (i, j) in indep]]) self._psi_star_projection_solver.uw_function = row self._psi_star_projection_solver.smoothing = 0.0 self._psi_star_projection_solver.solve() # Fan out flat result to tensor psi_star[0] for k, (i, j) in enumerate(indep): vals = self._psi_star_flat_var.array[:, 0, k] self.psi_star[0].array[:, i, j] = vals if i != j: self.psi_star[0].array[:, j, i] = vals else: self._psi_star_projection_solver.uw_function = self.psi_fn self._psi_star_projection_solver.smoothing = 0.0 self._psi_star_projection_solver.solve() # Copy to all other history slots for i in range(1, self.order): self.psi_star[i].array[...] = self.psi_star[0].array[...] self._history_initialised = True return
[docs] def initiate_history_fn(self): """Deprecated: use ``initialise_history`` instead.""" self.initialise_history()
[docs] def update( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, dt_physical: Optional = None, ): """Update history (alias for ``update_pre_solve``).""" self.update_pre_solve(dt, evalf, verbose, dt_physical) return
[docs] def update_post_solve( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, dt_physical: Optional[float] = None, ): """Post-solve: record timestep and increment solve counter.""" self._dt = dt # Record timestep history for variable-dt BDF for i in range(self.order - 1, 0, -1): self._dt_history[i] = self._dt_history[i - 1] self._dt_history[0] = dt if self._n_solves_completed < self.order: self._n_solves_completed += 1 return
[docs] def update_pre_solve( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, dt_physical: Optional[float] = None, store_result: Optional[bool] = True, ): """Sample upstream values along characteristics before solve. On the first call, automatically initialises history from the current field values so that bdf() returns zero on the first step. Parameters ---------- store_result : bool, optional If True (default), evaluate psi_fn at current positions and store in psi_star[0] before advection — the standard DDt behaviour. If False, skip this step and the history shift: only advect the existing psi_star levels upstream. Used by VE_Stokes where psi_star[0] already contains the projected actual stress from the previous solve. """ self._dt = dt if not self._history_initialised: self.initialise_history() # Update coefficient values for current effective_order and dt _update_bdf_values(self._bdf_coeffs, self.effective_order, self._dt, self._dt_history) _update_am_values(self._am_coeffs, self.effective_order, 0.5) ## Progress from the oldest part of the history # 1. Copy the stored values down the chain in preparation for the next timestep # The history term is the nodel value of psi_fn offset back along the characteristics # according to the timestep. # That is: # # - psi_star[0] is the current value of psi_fn, sampled # at the location of the nodes in their previous position at t-\Delta t # # - psi_star[1] is the value of psi_star[0] from the previous timestep # sampled at the location of the nodes at t - \Delta t. (note this is approximately # equivalent to the value of psi_star[0] at t - 2\Delta t) # # - psi_star[2] etc if required ... # # First we copy the history, then we sample can sample upstream values if dt_physical is not None: phi = sympy.Min(1, dt / dt_physical) else: phi = sympy.sympify(1) if store_result: for i in range(self.order - 1, 0, -1): self.psi_star[i].array[...] = ( phi * self.psi_star[i - 1].array[...] + (1 - phi) * self.psi_star[i].array[...] ) # 2. Compute the current value of psi_fn which we store in psi_star[0] # Note the need to do a try/except to handle unsupported evaluations # (e.g. of derivatives) # # When store_result=False (e.g. VE stress history), skip this step — # psi_star[0] already contains the projected actual stress from # the previous solve and we want to advect *that*, not the flux. # CRITICAL FIX (2025-11-28): Handle coordinates correctly for unit-aware mode. # Previous bug: extracting .magnitude gives METERS (e.g., 1000000), but: # - mesh.get_closest_cells() expects [0-1] non-dimensional coords # - evaluate() assumes plain numpy is [0-1] non-dimensional # Solution: use uw.non_dimensionalise() for proper conversion, OR pass # unit-aware coords to evaluate() which handles conversion internally. from underworld3.utilities.unit_aware_array import UnitAwareArray psi_star_0_coords = self.psi_star[0].coords # For mesh internal operations, need non-dimensional [0-1] coordinates if hasattr(psi_star_0_coords, "magnitude"): # Unit-aware coords - need to non-dimensionalize (not just extract magnitude!) psi_star_0_coords_nd = uw.non_dimensionalise(psi_star_0_coords) # Extract to plain numpy for mesh operations if isinstance(psi_star_0_coords_nd, UnitAwareArray): psi_star_0_coords_nd = np.array(psi_star_0_coords_nd) elif hasattr(psi_star_0_coords_nd, 'magnitude'): psi_star_0_coords_nd = psi_star_0_coords_nd.magnitude else: psi_star_0_coords_nd = np.array(psi_star_0_coords_nd) else: # Plain numpy - assume already non-dimensional psi_star_0_coords_nd = psi_star_0_coords cellid = self.mesh.get_closest_cells( psi_star_0_coords_nd, ) # Move slightly within the chosen cell to avoid edge effects centroid_coords = self.mesh._centroids[cellid] shift = 0.001 node_coords_nd = (1.0 - shift) * psi_star_0_coords_nd[:, :] + shift * centroid_coords[ :, : ] if store_result: try: # Use shifted ND coords to avoid quad mesh boundary issues # node_coords_nd is slightly shifted toward cell centroids # evaluate() treats plain numpy as ND [0-1] coordinates eval_result = uw.function.evaluate( self.psi_fn, node_coords_nd, evalf=evalf, ) # Wrap result with units if psi_star has units but eval didn't return UnitAwareArray psi_star_units = self.psi_star[0].units if psi_star_units is not None and not isinstance(eval_result, UnitAwareArray): eval_result = UnitAwareArray(eval_result, units=psi_star_units) self.psi_star[0].array[...] = eval_result except Exception: # Fallback to projection solver for expressions that can't be directly evaluated # (e.g., containing derivatives) self._psi_star_projection_solver.uw_function = self.psi_fn self._psi_star_projection_solver.smoothing = 0.0 self._psi_star_projection_solver.solve(verbose=verbose) # 3. Compute the upstream values from the psi_fn # We use the u_star variable as a working value here so we have to work backwards # so we don't over-write the history terms # # Convert dt to model units for numerical arithmetic # (after symbolic logic that may use dt with units) # Note: uw is already imported at module level (line 7) model = uw.get_default_model() # DIAGNOSTIC: Capture information about the unit system coords_template = self.psi_star[0].coords has_units = hasattr(coords_template, "magnitude") or hasattr(coords_template, "_magnitude") # Maintain unit system consistency: either keep everything with units or convert to non-dimensional if has_units: # Physical coordinate system with units # dt must be converted to base SI seconds so that dt * velocity(m/s) = distance(m) if hasattr(dt, "to"): # It's a Pint quantity dt_for_calc = dt.to("second") # Convert to seconds (still a quantity) else: # If dt is already a dimensionless number, treat it as seconds dt_for_calc = dt else: # Non-dimensional coordinate system - convert dt to non-dimensional # CRITICAL: Actually non-dimensionalize the timestep! if hasattr(dt, "magnitude") or hasattr(dt, "value"): # dt has units - non-dimensionalize it dt_nondim = uw.non_dimensionalise(dt, model) # Extract the dimensionless value if hasattr(dt_nondim, "magnitude"): dt_for_calc = float(dt_nondim.magnitude) elif hasattr(dt_nondim, "value"): dt_for_calc = float(dt_nondim.value) else: dt_for_calc = float(dt_nondim) else: # Already dimensionless dt_for_calc = dt for i in range(self.order - 1, -1, -1): # 2nd order update along characteristics # Use shifted ND coords to avoid quad mesh boundary issues # node_coords_nd is slightly shifted toward cell centroids (lines 703-709) # evaluate() treats plain numpy as ND [0-1] coordinates v_result = uw.function.evaluate( self.V_fn, node_coords_nd, ) # CRITICAL: Preserve UnitAwareArray through slicing # Slicing can sometimes return plain numpy views - need to preserve wrapper from underworld3.utilities.unit_aware_array import UnitAwareArray if isinstance(v_result, UnitAwareArray): # Slice and rewrap to preserve units v_at_node_pts = v_result[:, 0, :] if not isinstance(v_at_node_pts, UnitAwareArray): # Slicing lost the wrapper - rewrap it v_at_node_pts = UnitAwareArray(v_at_node_pts, units=v_result.units) else: v_at_node_pts = v_result[:, 0, :] # Non-dimensionalize velocities when working with dimensionless coordinates # This prevents dimensional mismatch: velocities in m/s mixed with coords in [0,1] # CRITICAL: evaluate now returns UnitAwareArray with units attached # Check if velocities already have units before trying to add them manually if not has_units: # Coordinates are dimensionless - need to non-dimensionalize velocities too if isinstance(v_at_node_pts, UnitAwareArray): # Velocities already have units from evaluate - just non-dimensionalize v_nondim = uw.non_dimensionalise(v_at_node_pts, model) # Extract numpy array for dimensionless calculation if isinstance(v_nondim, UnitAwareArray): v_at_node_pts = np.array(v_nondim) elif hasattr(v_nondim, "value"): v_at_node_pts = v_nondim.value else: v_at_node_pts = v_nondim else: # Velocities don't have units - try to add them manually (legacy path) v_units = uw.get_units(self.V_fn) if v_units and v_units != "dimensionless": v_with_units = UnitAwareArray(v_at_node_pts, units=v_units) v_nondim = uw.non_dimensionalise(v_with_units, model) if isinstance(v_nondim, UnitAwareArray): v_at_node_pts = np.array(v_nondim) elif hasattr(v_nondim, "value"): v_at_node_pts = v_nondim.value else: v_at_node_pts = v_nondim else: # Dimensional mode - ensure velocities have units # CRITICAL FIX (2025-11-27): Variable data is stored NON-DIMENSIONALLY. # We must DIMENSIONALIZE (not just wrap) the values before dimensional arithmetic. # Previous bug: wrapping 0.01 (ND) with cm/yr gave 0.01 cm/yr instead of 1 cm/yr. if not isinstance(v_at_node_pts, UnitAwareArray): v_units = uw.get_units(self.V_fn) if v_units and v_units != "dimensionless": # Re-dimensionalize using the scaling system if uw.is_nondimensional_scaling_active(): from underworld3.scaling import dimensionalise # dimensionalise(nd_value, units) -> value * scale in those units v_dimensional = dimensionalise(v_at_node_pts, v_units) v_at_node_pts = UnitAwareArray(v_dimensional.magnitude, units=v_dimensional.units) else: # No scaling active - assume values are already dimensional v_at_node_pts = UnitAwareArray(v_at_node_pts, units=v_units) # Get coordinates coords = self.psi_star[i].coords # CRITICAL: When working in dimensionless mode, extract coords to plain arrays # to match the dimensionless velocities (otherwise unit mismatch occurs) from underworld3.utilities.unit_aware_array import UnitAwareArray if not has_units and isinstance(coords, UnitAwareArray): # Extract to plain numpy for dimensionless arithmetic coords = np.array(coords) # CRITICAL (2025-11-27): Multiply velocity FIRST so UnitAwareArray.__mul__ handles it. # If we do `dt_for_calc * v_at_node_pts`, Pint handles it and loses UnitAwareArray units. mid_pt_coords = coords - v_at_node_pts * (0.5 * dt_for_calc) # Clamp midpoint coordinates to the domain boundary if self.mesh.return_coords_to_bounds is not None: mid_pt_coords = self.mesh.return_coords_to_bounds(mid_pt_coords) v_mid_result = uw.function.global_evaluate( self.V_fn, mid_pt_coords, ) # CRITICAL: Preserve UnitAwareArray through slicing if isinstance(v_mid_result, UnitAwareArray): # Slice and rewrap to preserve units v_at_mid_pts = v_mid_result[:, 0, :] if not isinstance(v_at_mid_pts, UnitAwareArray): # Slicing lost the wrapper - rewrap it v_at_mid_pts = UnitAwareArray(v_at_mid_pts, units=v_mid_result.units) else: v_at_mid_pts = v_mid_result[:, 0, :] # Non-dimensionalize mid-point velocities when working with dimensionless coordinates # CRITICAL: global_evaluate now returns UnitAwareArray with units attached # Check if velocities already have units before trying to add them manually if not has_units: # Coordinates are dimensionless - need to non-dimensionalize velocities too if isinstance(v_at_mid_pts, UnitAwareArray): # Velocities already have units from global_evaluate - just non-dimensionalize v_nondim = uw.non_dimensionalise(v_at_mid_pts, model) # Extract numpy array for dimensionless calculation if isinstance(v_nondim, UnitAwareArray): v_at_mid_pts = np.array(v_nondim) elif hasattr(v_nondim, "value"): v_at_mid_pts = v_nondim.value else: v_at_mid_pts = v_nondim else: # Velocities don't have units - try to add them manually (legacy path) v_units = uw.get_units(self.V_fn) if v_units and v_units != "dimensionless": v_with_units = UnitAwareArray(v_at_mid_pts, units=v_units) v_nondim = uw.non_dimensionalise(v_with_units, model) if isinstance(v_nondim, UnitAwareArray): v_at_mid_pts = np.array(v_nondim) elif hasattr(v_nondim, "value"): v_at_mid_pts = v_nondim.value else: v_at_mid_pts = v_nondim else: # Dimensional mode - ensure velocities have units # CRITICAL: If V_fn doesn't have unit metadata, evaluate() returns plain numpy # We need to manually wrap it with units for dimensional arithmetic to work if not isinstance(v_at_mid_pts, UnitAwareArray): v_units = uw.get_units(self.V_fn) if v_units and v_units != "dimensionless": # Wrap velocities with their proper units v_at_mid_pts = UnitAwareArray(v_at_mid_pts, units=v_units) # Calculate upstream coordinates: current position - velocity * timestep end_pt_coords = coords - v_at_mid_pts * dt_for_calc # Clamp upstream coordinates to the domain boundary if self.mesh.return_coords_to_bounds is not None: end_pt_coords = self.mesh.return_coords_to_bounds(end_pt_coords) # Extract scalar from (1,1) Matrix for scalar variables # MeshVariable.sym returns Matrix([[value]]) for scalars expr_to_evaluate = self.psi_star[i].sym if hasattr(expr_to_evaluate, 'shape') and expr_to_evaluate.shape == (1, 1): expr_to_evaluate = expr_to_evaluate[0, 0] # Evaluate psi_star at upstream coordinates # global_evaluate now returns dimensional results (gateway fix 2025-11-28) value_at_end_points = uw.function.global_evaluate( expr_to_evaluate, end_pt_coords, ) # CRITICAL FIX (2025-11-27): If psi_star has units, ensure the assigned # value also has units. global_evaluate may return plain arrays. psi_star_units = self.psi_star[i].units if psi_star_units is not None and not isinstance(value_at_end_points, UnitAwareArray): value_at_end_points = UnitAwareArray(value_at_end_points, units=psi_star_units) self.psi_star[i].array[...] = value_at_end_points # disable this for now - Compute moments before update if 0 and self.preserve_moments and self._workVar.num_components == 1: self.I.fn = self.psi_star[i].sym[0] Imean0 = self.I.evaluate() self.I.fn = (self.psi_star[i].sym[0] - Imean0) ** 2 IL20 = np.sqrt(self.I.evaluate()) # disable this for now - Restore moments after update if 0 and self.preserve_moments and self._workVar.num_components == 1: self.I.fn = self.psi_star[i].sym[0] Imean = self.I.evaluate() self.I.fn = (self.psi_star[i].sym[0] - Imean) ** 2 IL2 = np.sqrt(self.I.evaluate()) # TODO: DELETE remove swarm.access / data, replace with direct array assignment # with self.mesh.access(self.psi_star[i]): # self.psi_star[i].data[...] += Imean0 - Imean self.psi_star[i].array[...] += Imean0 - Imean self.I.fn = (self.psi_star[i].sym[0] - Imean0) ** 2 IL2 = np.sqrt(self.I.evaluate()) # TODO: DELETE remove swarm.access / data, replace with direct array assignment # with self.mesh.access(self.psi_star[i]): # self.psi_star[i].data[...] = ( # self.psi_star[i].data[...] - Imean0 # ) * IL20 / IL2 + Imean0 self.psi_star[i].array[...] = ( self.psi_star[i].array[...] - Imean0 ) * IL20 / IL2 + Imean0 return
@property def bdf_coefficients(self): """Current BDF coefficients [c0, c1, ...] accounting for variable timesteps.""" return _bdf_coefficients(self.effective_order, self._dt, self._dt_history)
[docs] def bdf(self, order=None): r"""Backward differentiation approximation of the time-derivative of :math:`\psi`. Returns a fixed-structure symbolic expression using UWexpression coefficients. Values are updated each step in ``update_pre_solve``. Parameters ---------- order : int, optional Ignored (kept for API compatibility). """ return _build_weighted_sum(self._bdf_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
[docs] def adams_moulton_flux(self, order=None): r"""Adams-Moulton flux approximation for implicit time integration. Returns a fixed-structure symbolic expression using UWexpression coefficients. Values are updated each step in ``update_pre_solve``. Parameters ---------- order : int, optional Ignored (kept for API compatibility). """ return _build_weighted_sum(self._am_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
## Consider Deprecating this one - it is the same as the Lagrangian_Swarm but ## sets up the swarm for itself. This does not have a practical use-case - the swarm version ## is slower, more cumbersome, and less stable / accurate. The only reason to use ## it is if there is an existing swarm that we can re-purpose.
[docs] class Lagrangian(uw_object): r""" Swarm-based Lagrangian history manager for material tracking. Manages the update of a Lagrangian variable :math:`\psi` on a swarm across timesteps. Creates and manages its own internal swarm for tracking material properties through the flow. .. math:: \psi_p^{t-n\Delta t} \leftarrow \psi_p^{t-(n-1)\Delta t} \psi_p^{t-(n-1)\Delta t} \leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \psi_p^{t-\Delta t} \leftarrow \psi_p^{t} The Lagrangian approach follows material points through the flow, avoiding numerical diffusion in advection. History values are stored on swarm variables with proxy mesh variables for solver integration. Parameters ---------- mesh : underworld3.discretisation.Mesh The computational mesh. psi_fn : sympy.Function The quantity to track (e.g., stress tensor ``stokes.stress``). V_fn : sympy.Function Velocity field for particle advection. vtype : VarType Variable type (SCALAR, VECTOR, SYM_TENSOR, etc.). degree : int Polynomial degree for proxy mesh variables. continuous : bool Whether proxy variables are continuous across elements. varsymbol : str, default=r"u" LaTeX symbol for display. verbose : bool, default=False Enable verbose output during updates. bcs : list, default=[] Boundary conditions (currently unused for swarm variables). order : int, default=1 Number of history timesteps to store. smoothing : float, default=0.0 Smoothing parameter for projections. fill_param : int, default=3 Fill parameter for swarm population density. Notes ----- The Lagrangian method is ideal for: - Viscoelastic stress history tracking - Material property advection without diffusion - Tracking compositional fields The internal swarm is automatically advected during updates. See Also -------- SemiLagrangian : Mesh-based semi-Lagrangian with departure points. Lagrangian_Swarm : For user-provided swarms. """ instances = ( 0 # count how many of these there are in order to create unique private mesh variable ids )
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, psi_fn: sympy.Function, V_fn: sympy.Function, vtype: uw.VarType, degree: int, continuous: bool, varsymbol: Optional[str] = r"u", verbose: Optional[bool] = False, bcs=[], order=1, smoothing=0.0, fill_param=3, ): super().__init__() # create a new swarm to manage here dudt_swarm = uw.swarm.UWSwarm(mesh) self.mesh = mesh self.swarm = dudt_swarm self.psi_fn = psi_fn self.V_fn = V_fn self.verbose = verbose self.order = order # History tracking: deferred initialization and effective order self._history_initialised = False self._n_solves_completed = 0 self._dt = None # current timestep (set by solver or update_pre_solve) self._dt_history = [None] * order # previous timesteps for variable-dt BDF psi_star = [] self.psi_star = psi_star for i in range(order): self.psi_star.append( uw.swarm.SwarmVariable( f"psi_star_sw_{self.instance_number}_{i}", self.swarm, vtype=vtype, proxy_degree=degree, proxy_continuous=continuous, varsymbol=rf"{varsymbol}^{{ {'*'*(i+1)} }}", ) ) # BDF/AM coefficient UWexpressions — routed through PetscDS constants[] self._bdf_coeffs = _create_coefficients(order, r"c^{\mathrm{BDF}}", self.instance_number) self._am_coeffs = _create_coefficients(order, r"a^{\mathrm{AM}}", self.instance_number) # Initialise to order-1 values _update_bdf_values(self._bdf_coeffs, 1, None, []) _update_am_values(self._am_coeffs, 1, 0.5) dudt_swarm.populate(fill_param) return
def _object_viewer(self): from IPython.display import Latex, Markdown, display super()._object_viewer() ## feedback on this instance display(Latex(r"$\quad\psi = $ " + self.psi._repr_latex_())) display( Latex( r"$\quad\Delta t_{\textrm{phys}} = $ " + sympy.sympify(self.dt_physical)._repr_latex_() ) ) display(Latex(rf"$\quad$History steps = {self.order}")) @property def effective_order(self): """Current effective BDF order, accounting for history startup.""" # BDF-k requires k completed solves to have k distinct history values. # With 0 or 1 completed solves → order 1. Order 2 needs ≥2 solves. return min(self.order, max(1, self._n_solves_completed))
[docs] def initialise_history(self): r"""Initialize all history slots to the current value of :math:`\psi`. Called automatically on the first ``update_pre_solve``. Can also be called manually after setting initial conditions. """ psi_star_0 = self.psi_star[0] with self.swarm.access(psi_star_0): for i in range(psi_star_0.shape[0]): for j in range(psi_star_0.shape[1]): updated_psi = uw.function.evaluate( self.psi_fn[i, j], self.swarm.data, ) psi_star_0[i, j].data[:] = updated_psi # Copy to all other history slots for k in range(1, self.order): with self.swarm.access(self.psi_star[k]): self.psi_star[k].data[...] = psi_star_0.data[...] self._history_initialised = True return
[docs] def initiate_history_fn(self): """Deprecated: use ``initialise_history`` instead.""" self.initialise_history()
## Note: We may be able to eliminate this ## The SL updater and the Lag updater have ## different sequencing because of the way they ## update the history. It makes more sense for the ## full Lagrangian swarm to be updated after the solve ## and this means we have to grab the history values first.
[docs] def update( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Update history (alias for ``update_post_solve``).""" self.update_post_solve(dt, evalf, verbose) return
[docs] def update_pre_solve( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Pre-solve: auto-initialise history on first call.""" self._dt = dt if not self._history_initialised: self.initialise_history() # Update coefficient values for current effective_order and dt _update_bdf_values(self._bdf_coeffs, self.effective_order, self._dt, self._dt_history) _update_am_values(self._am_coeffs, self.effective_order, 0.5) return
[docs] def update_post_solve( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Shift history chain and advect swarm after solve.""" self._dt = dt # Record timestep history for variable-dt BDF for i in range(self.order - 1, 0, -1): self._dt_history[i] = self._dt_history[i - 1] self._dt_history[0] = dt for h in range(self.order - 1): i = self.order - (h + 1) # copy the information down the chain if verbose: print(f"Lagrange order = {self.order}") print(f"Lagrange copying {i-1} to {i}") self.psi_star[i].array[...] = self.psi_star[i - 1].array[...] # Now update the swarm variable psi_star_0 = self.psi_star[0] with self.swarm.access(psi_star_0): for i in range(psi_star_0.shape[0]): for j in range(psi_star_0.shape[1]): updated_psi = uw.function.evaluate( self.psi_fn[i, j], self.swarm.data, evalf=evalf, ) psi_star_0[i, j].data[:] = updated_psi # Now update the swarm locations self.swarm.advection( self.V_fn, delta_t=dt, restore_points_to_domain_func=self.mesh.return_coords_to_bounds, ) if self._n_solves_completed < self.order: self._n_solves_completed += 1
@property def bdf_coefficients(self): """Current BDF coefficients [c0, c1, ...] accounting for variable timesteps.""" return _bdf_coefficients(self.effective_order, self._dt, self._dt_history)
[docs] def bdf(self, order=None): r"""Backward differentiation approximation of the time-derivative of :math:`\psi`. Returns a fixed-structure symbolic expression using UWexpression coefficients. Values are updated each step in ``update_pre_solve``. Parameters ---------- order : int, optional Ignored (kept for API compatibility). """ return _build_weighted_sum(self._bdf_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
[docs] def adams_moulton_flux(self, order=None): r"""Adams-Moulton flux approximation for implicit time integration. Returns a fixed-structure symbolic expression using UWexpression coefficients. Values are updated each step in ``update_pre_solve``. Parameters ---------- order : int, optional Ignored (kept for API compatibility). """ return _build_weighted_sum(self._am_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
[docs] class Lagrangian_Swarm(uw_object): r""" Swarm-based Lagrangian history manager (user-provided swarm). Manages the update of a Lagrangian variable :math:`\psi` on a user-supplied swarm across timesteps: .. math:: \psi_p^{t-n\Delta t} \leftarrow \psi_p^{t-(n-1)\Delta t} \psi_p^{t-(n-1)\Delta t} \leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \psi_p^{t-\Delta t} \leftarrow \psi_p^{t} Unlike ``Lagrangian``, this class uses a user-provided swarm rather than creating its own. The swarm should already be populated and configured for tracking material points. Parameters ---------- swarm : underworld3.swarm.Swarm User-provided swarm for material point tracking. psi_fn : sympy.Function The quantity to track (e.g., stress tensor from a solver). vtype : underworld3.VarType Variable type (SCALAR, VECTOR, SYM_TENSOR, etc.). degree : int Interpolation degree for proxy mesh variables. continuous : bool Whether proxy mesh variables should be continuous. varsymbol : str, optional LaTeX symbol for display (default ``"u"``). verbose : bool, optional Enable verbose output (default ``False``). bcs : list, optional Boundary conditions (currently unused, default ``[]``). order : int, optional Order of time integration (1-3) (default ``1``). smoothing : float, optional Smoothing parameter (default ``0.0``). step_averaging : int, optional Number of steps for history averaging (default ``2``). Attributes ---------- mesh : underworld3.discretisation.Mesh Reference to the computational mesh (from swarm). swarm : underworld3.swarm.Swarm The user-provided swarm. psi_fn : sympy.Function Symbolic expression for the tracked quantity. order : int Order of BDF integration. step_averaging : int Number of steps for averaging (affects BDF scaling). psi_star : list History values :math:`\psi^*, \psi^{**}, \ldots` as swarm variables. Notes ----- Key differences from ``Lagrangian`` class: - Uses user-provided swarm (not internally created) - Swarm advection is NOT performed (user's responsibility) - Step averaging for smoothing history updates - Suitable when swarm is shared between multiple history managers The ``step_averaging`` parameter scales the BDF formula to account for updates that occur over multiple sub-steps within a timestep. See Also -------- Lagrangian : Creates and manages its own swarm with advection. SemiLagrangian : Nodal-swarm approach for advection-dominated problems. Eulerian : Pure mesh-based history (no particle tracking). """ instances = ( 0 # count how many of these there are in order to create unique private mesh variable ids )
[docs] @timing.routine_timer_decorator def __init__( self, swarm: uw.swarm.Swarm, psi_fn: sympy.Function, vtype: uw.VarType, degree: int, continuous: bool, varsymbol: Optional[str] = r"u", verbose: Optional[bool] = False, bcs=[], order=1, smoothing=0.0, step_averaging=2, ): super().__init__() self.mesh = swarm.mesh self.swarm = swarm self.psi_fn = psi_fn self.verbose = verbose self.order = order self.step_averaging = step_averaging # History tracking: deferred initialization and effective order self._history_initialised = False self._n_solves_completed = 0 self._dt = None # current timestep (set by solver or update_pre_solve) self._dt_history = [None] * order # previous timesteps for variable-dt BDF psi_star = [] self.psi_star = psi_star for i in range(order): self.psi_star.append( uw.swarm.SwarmVariable( f"psi_star_sw_{self.instance_number}_{i}", self.swarm, vtype=vtype, proxy_degree=degree, proxy_continuous=continuous, varsymbol=rf"{varsymbol}^{{ {'*'*(i+1)} }}", ) ) # BDF/AM coefficient UWexpressions — routed through PetscDS constants[] self._bdf_coeffs = _create_coefficients(order, r"c^{\mathrm{BDF}}", self.instance_number) self._am_coeffs = _create_coefficients(order, r"a^{\mathrm{AM}}", self.instance_number) # Initialise to order-1 values _update_bdf_values(self._bdf_coeffs, 1, None, []) _update_am_values(self._am_coeffs, 1, 0.5) return
def _object_viewer(self): from IPython.display import Latex, Markdown, display super()._object_viewer() ## feedback on this instance display(Latex(r"$\quad\psi = $ " + self.psi._repr_latex_())) display( Latex( r"$\quad\Delta t_{\textrm{phys}} = $ " + sympy.sympify(self.dt_physical)._repr_latex_() ) ) display(Latex(rf"$\quad$History steps = {self.order}")) @property def effective_order(self): """Current effective BDF order, accounting for history startup.""" # BDF-k requires k completed solves to have k distinct history values. # With 0 or 1 completed solves → order 1. Order 2 needs ≥2 solves. return min(self.order, max(1, self._n_solves_completed))
[docs] def initialise_history(self): r"""Initialize all history slots to the current value of :math:`\psi`. Called automatically on the first ``update_pre_solve``. Can also be called manually after setting initial conditions. """ psi_star_0 = self.psi_star[0] with self.swarm.access(psi_star_0): for i in range(psi_star_0.shape[0]): for j in range(psi_star_0.shape[1]): updated_psi = uw.function.evaluate( self.psi_fn[i, j], self.swarm.data, ) psi_star_0[i, j].data[:] = updated_psi # Copy to all other history slots for k in range(1, self.order): with self.swarm.access(self.psi_star[k]): self.psi_star[k].data[...] = psi_star_0.data[...] self._history_initialised = True return
[docs] def initiate_history_fn(self): """Deprecated: use ``initialise_history`` instead.""" self.initialise_history()
## Note: We may be able to eliminate this ## The SL updater and the Lag updater have ## different sequencing because of the way they ## update the history. It makes more sense for the ## full Lagrangian swarm to be updated after the solve ## and this means we have to grab the history values first.
[docs] def update( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Update history (alias for ``update_post_solve``).""" self.update_post_solve(dt, evalf, verbose) return
[docs] def update_pre_solve( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Pre-solve: auto-initialise history on first call.""" self._dt = dt if not self._history_initialised: self.initialise_history() # Update coefficient values for current effective_order and dt _update_bdf_values(self._bdf_coeffs, self.effective_order, self._dt, self._dt_history) _update_am_values(self._am_coeffs, self.effective_order, 0.5) return
[docs] def update_post_solve( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): r"""Shift history chain and evaluate current :math:`\psi` on swarm.""" self._dt = dt # Record timestep history for variable-dt BDF for i in range(self.order - 1, 0, -1): self._dt_history[i] = self._dt_history[i - 1] self._dt_history[0] = dt for h in range(self.order - 1): i = self.order - (h + 1) # copy the information down the chain if verbose: print(f"Lagrange swarm order = {self.order}", flush=True) print( f"Mesh interpolant order = {self.psi_star[0]._meshVar.degree}", flush=True, ) print(f"Lagrange swarm copying {i-1} to {i}", flush=True) with self.swarm.access(self.psi_star[i]): self.psi_star[i].data[...] = self.psi_star[i - 1].data[...] phi = 1 / self.step_averaging psi_star_0 = self.psi_star[0] with self.swarm.access(psi_star_0): for i in range(psi_star_0.shape[0]): for j in range(psi_star_0.shape[1]): updated_psi = uw.function.evaluate( self.psi_fn[i, j], self.swarm.data, evalf=evalf, ) psi_star_0[i, j].data[:] = ( phi * updated_psi + (1 - phi) * psi_star_0[i, j].data[:] ) if self._n_solves_completed < self.order: self._n_solves_completed += 1 return
@property def bdf_coefficients(self): """Current BDF coefficients [c0, c1, ...] accounting for variable timesteps.""" return _bdf_coefficients(self.effective_order, self._dt, self._dt_history)
[docs] def bdf(self, order=None): r"""Backward differentiation approximation of the time-derivative of :math:`\psi`. Returns a fixed-structure symbolic expression using UWexpression coefficients. Values are updated each step in ``update_pre_solve``. Parameters ---------- order : int, optional Ignored (kept for API compatibility). """ return _build_weighted_sum(self._bdf_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])
[docs] def adams_moulton_flux(self, order=None): r"""Adams-Moulton flux approximation for implicit time integration. Returns a fixed-structure symbolic expression using UWexpression coefficients. Values are updated each step in ``update_pre_solve``. Parameters ---------- order : int, optional Ignored (kept for API compatibility). """ return _build_weighted_sum(self._am_coeffs, self.psi_fn, [ps.sym for ps in self.psi_star])