Source code for underworld3.systems.ddt

r"""
Time derivative approximations for transient problems.

This module provides classes for computing time derivatives (D/Dt) using
various numerical schemes. These are used within solvers to discretize the
time dimension of PDEs.

All DDt classes share a common interface:

- ``update(dt, ...)`` — alias for ``update_pre_solve``
- ``update_pre_solve(dt, ...)`` — called before the PDE solve
- ``update_post_solve(dt, ...)`` — called after the PDE solve
- ``bdf(order)`` — backward differentiation formula (returns Δψ, divide by Δt for rate)
- ``adams_moulton_flux(order)`` — weighted flux for implicit integration

**Symbolic** -- Pure symbolic history, no mesh storage. Used for flux
tracking in SNES_Diffusion where the flux expression is a SymPy tree.

**Eulerian** -- Fixed-grid time derivative with optional grid-based
advection. When ``V_fn`` is provided, ``update_pre_solve`` applies an
explicit advection correction so that ``bdf()`` approximates the full
material derivative.

**SemiLagrangian** -- Characteristic-based D/Dt via departure points.
Traces backward along velocity field to sample upstream values.
Unconditionally stable for advection (no CFL constraint).

**Lagrangian** -- Full particle-following D/Dt. Creates and manages
its own swarm, advected during ``update_post_solve``.

**Lagrangian_Swarm** -- Specialized swarm-based Lagrangian using a
user-provided swarm. Swarm advection is the user's responsibility.

Notes
-----
The choice of time derivative scheme affects accuracy, stability, and
computational cost:

- **Eulerian**: Fixed grid, simple, best for weak-to-moderate advection
  or purely diffusive problems (e.g. Richards equation with no transport).
- **Eulerian (with V_fn)**: Adds explicit advection correction — subject
  to CFL but suitable when velocity is moderate.
- **Semi-Lagrangian**: Best for advection-dominated problems (high Péclet).
  Unconditionally stable but breaks down when v → 0.
- **Lagrangian**: Tracks material properties without numerical diffusion.
  Best for purely advected quantities (e.g. composition).

See Also
--------
underworld3.systems.solvers : PDE solvers using these time derivatives.
"""

import sympy
from sympy import sympify
import numpy as np

from typing import Optional, Callable, Union

import underworld3 as uw
from underworld3 import VarType

import underworld3.timing as timing
from underworld3.utilities._api_tools import uw_object

from petsc4py import PETSc


def _as_float(value):
    """Extract a plain float from various numeric types (Pint, UWQuantity, etc.)."""
    if value is None:
        return None
    if isinstance(value, (int, float)):
        return float(value)
    if hasattr(value, "magnitude"):
        return float(value.magnitude)
    if hasattr(value, "value"):
        return float(value.value)
    try:
        return float(value)
    except (TypeError, ValueError):
        return None


def _bdf_coefficients(order, dt_current, dt_history):
    r"""Compute BDF coefficients, handling variable timesteps.

    For uniform timesteps the classical BDF-k coefficients are returned
    as exact ``sympy.Rational`` values.  When the timestep ratio differs
    from unity the variable-step BDF-2 formula is used (BDF-3 falls back
    to variable-step BDF-2 when the ratio exceeds 5 %).

    Parameters
    ----------
    order : int
        BDF order (1, 2, or 3).
    dt_current : float or None
        Current timestep :math:`\Delta t_n`.
    dt_history : list
        Previous timesteps ``[dt_{n-1}, dt_{n-2}, ...]``.

    Returns
    -------
    list of sympy.Basic
        Coefficients ``[c0, c1, c2, ...]`` such that

        .. math::

            \Delta\psi_{\mathrm{BDF}} = c_0\,\psi^{n+1}
                + c_1\,\psi^n + c_2\,\psi^{n-1} + \cdots

    Notes
    -----
    **BDF-1** (backward Euler) always returns ``[1, -1]``.

    **BDF-2** with variable step uses
    :math:`r = \Delta t_n / \Delta t_{n-1}`:

    .. math::

        c_0 = \frac{1+2r}{1+r}, \quad
        c_1 = -(1+r), \quad
        c_2 = \frac{r^2}{1+r}

    which reduces to :math:`[3/2,\;-2,\;1/2]` when :math:`r=1`.

    **BDF-3** uses the constant-step formula when all ratios are within
    5 % of unity; otherwise it falls back to variable-step BDF-2.
    """
    if order <= 1:
        return [sympy.Integer(1), sympy.Integer(-1)]

    dt_n = _as_float(dt_current)
    dt_nm1 = (
        _as_float(dt_history[0])
        if len(dt_history) > 0 and dt_history[0] is not None
        else None
    )

    if order == 2:
        if dt_n is not None and dt_nm1 is not None and dt_nm1 > 0:
            r = dt_n / dt_nm1
            if abs(r - 1.0) < 1e-12:
                # Exactly constant dt — use exact rationals
                return [
                    sympy.Rational(3, 2),
                    sympy.Integer(-2),
                    sympy.Rational(1, 2),
                ]
            else:
                # Variable dt
                return [
                    sympy.sympify((1 + 2 * r) / (1 + r)),
                    sympy.sympify(-(1 + r)),
                    sympy.sympify(r**2 / (1 + r)),
                ]
        else:
            # No usable history — constant-dt fallback
            return [
                sympy.Rational(3, 2),
                sympy.Integer(-2),
                sympy.Rational(1, 2),
            ]

    # order >= 3
    dt_nm2 = (
        _as_float(dt_history[1])
        if len(dt_history) > 1 and dt_history[1] is not None
        else None
    )
    if (
        dt_n is not None
        and dt_nm1 is not None
        and dt_nm2 is not None
        and dt_nm1 > 0
        and dt_nm2 > 0
    ):
        r1 = dt_n / dt_nm1
        r2 = dt_nm1 / dt_nm2
        if abs(r1 - 1.0) < 0.05 and abs(r2 - 1.0) < 0.05:
            # Approximately constant dt — standard BDF-3
            return [
                sympy.Rational(11, 6),
                sympy.Integer(-3),
                sympy.Rational(3, 2),
                sympy.Rational(-1, 3),
            ]
        else:
            # Variable dt — fall back to BDF-2 with variable coefficients
            return _bdf_coefficients(2, dt_current, dt_history)
    else:
        # Insufficient history — constant-dt BDF-3
        return [
            sympy.Rational(11, 6),
            sympy.Integer(-3),
            sympy.Rational(3, 2),
            sympy.Rational(-1, 3),
        ]


[docs] class Symbolic(uw_object): r""" Symbolic history manager for time derivative approximations. Manages the update of a variable :math:`\psi` across timesteps. The history operator stores :math:`\psi` over several timesteps (given by ``order``) so that it can compute backward differentiation (BDF) or Adams-Moulton expressions. The history operator is defined as: .. math:: \psi_p^{t-n\Delta t} &\leftarrow \psi_p^{t-(n-1)\Delta t} \\ \psi_p^{t-(n-1)\Delta t} &\leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \\ \psi_p^{t-\Delta t} &\leftarrow \psi_p^{t} This is a purely symbolic history manager that operates on sympy expressions without mesh or swarm storage. It is useful for tracking symbolic expressions through time-stepping algorithms. Parameters ---------- psi_fn : sympy.Basic The sympy expression to track. Can be scalar or matrix form. theta : float, optional Implicitness parameter for Adams-Moulton order 1 (default ``0.5``). Values: 0 = explicit, 1 = implicit, 0.5 = Crank-Nicolson. varsymbol : str, optional LaTeX symbol for display (default ``r"\\psi"``). verbose : bool, optional Enable verbose output (default ``False``). bcs : list, optional Boundary conditions (default ``[]``). order : int, optional Order of time integration (1-3) (default ``1``). smoothing : float, optional Smoothing parameter (default ``0.0``). Notes ----- The ``Symbolic`` class is the base for understanding BDF and Adams-Moulton formulas without the complexity of mesh or swarm storage. It is primarily useful for: - Understanding time-stepping algorithm behavior - Debugging symbolic expressions in time-dependent problems - Prototyping before implementing with mesh/swarm storage For actual simulations, use ``Eulerian``, ``SemiLagrangian``, or ``Lagrangian`` which store history on computational meshes or swarms. See Also -------- Eulerian : Mesh-based history with BDF time-stepping. SemiLagrangian : Nodal-swarm approach for advection-dominated problems. Lagrangian : Swarm-based material tracking. """
[docs] @timing.routine_timer_decorator def __init__( self, psi_fn: sympy.Basic, # a sympy expression for ψ; can be scalar or matrix theta: Optional[float] = 0.5, varsymbol: Optional[str] = r"\psi", verbose: Optional[bool] = False, bcs=[], order: int = 1, smoothing: float = 0.0, ): super().__init__() self.theta = theta self.bcs = bcs self.verbose = verbose self.smoothing = smoothing self.order = order # History tracking: deferred initialization and effective order self._history_initialised = False self._n_solves_completed = 0 self._dt = None # current timestep (set by solver or update_pre_solve) self._dt_history = [None] * order # previous timesteps for variable-dt BDF # Ensure psi_fn is a sympy Matrix. if not isinstance(psi_fn, sympy.Matrix): try: psi_fn = sympy.Matrix(psi_fn) except Exception: psi_fn = sympy.Matrix([[psi_fn]]) self._psi_fn = psi_fn # stored with its native shape self._shape = psi_fn.shape # capture the shape # Set the display symbol for psi_fn and for the history variable. self._psi_fn_symbol = varsymbol # e.g. "\psi" self._psi_star_symbol = varsymbol + r"^\ast" # e.g. "\psi^\ast" # Create the history list: each element is a Matrix of shape _shape. self.psi_star = [sympy.zeros(*self._shape) for _ in range(order)] return
@property def psi_fn(self): r"""Current symbolic expression :math:`\psi` being tracked.""" return self._psi_fn @psi_fn.setter def psi_fn(self, new_fn): """Set the tracked symbolic expression.""" if not isinstance(new_fn, sympy.Matrix): try: new_fn = sympy.Matrix(new_fn) except Exception: new_fn = sympy.Matrix([[new_fn]]) # Optionally, one could check for matching shape; here we update both. self._psi_fn = new_fn self._shape = new_fn.shape return def _object_viewer(self): from IPython.display import Latex, display # Display the primary variable display(Latex(rf"$\quad {self._psi_fn_symbol} = {sympy.latex(self._psi_fn)}$")) # Display the history variable using the different symbol. history_latex = ", ".join([sympy.latex(elem) for elem in self.psi_star]) display(Latex(rf"$\quad {self._psi_star_symbol} = \left[{history_latex}\right]$")) @property def effective_order(self): """Current effective BDF order, accounting for history startup. For BDF order k, k distinct history values are needed. During startup, ``effective_order`` ramps from 1 to ``self.order`` as successive solves populate the history slots with distinct values. """ return min(self.order, max(1, self._n_solves_completed + 1))
[docs] def update_history_fn(self): r"""Copy current :math:`\psi` to the first history slot ``psi_star[0]``.""" # Update the first history element with a copy of the current ψ. self.psi_star[0] = self.psi_fn.copy()
[docs] def initialise_history(self): r"""Initialize all history slots to the current value of :math:`\psi`. Called automatically on the first ``update_pre_solve``. Can also be called manually after setting initial conditions. """ self.update_history_fn() # Propagate the initial history to all history steps. for i in range(1, self.order): self.psi_star[i] = self.psi_star[0].copy() self._history_initialised = True return
[docs] def initiate_history_fn(self): """Deprecated: use ``initialise_history`` instead.""" self.initialise_history()
[docs] def update( self, dt, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Update history (alias for ``update_pre_solve``).""" self.update_pre_solve(dt, evalf, verbose) return
[docs] def update_pre_solve( self, dt, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Pre-solve update hook. Auto-initialises history on first call.""" self._dt = dt if not self._history_initialised: self.initialise_history() return
[docs] def update_post_solve( self, dt, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): r"""Shift history chain after solve: :math:`\psi^{*n} \leftarrow \psi^{*(n-1)}`.""" self._dt = dt if verbose: print(f"Updating history for ψ = {self.psi_fn}", flush=True) # Record timestep history for variable-dt BDF for i in range(self.order - 1, 0, -1): self._dt_history[i] = self._dt_history[i - 1] self._dt_history[0] = dt # Shift history: copy each element down the chain. for i in range(self.order - 1, 0, -1): self.psi_star[i] = self.psi_star[i - 1].copy() self.update_history_fn() if self._n_solves_completed < self.order: self._n_solves_completed += 1 return
[docs] def bdf(self, order: Optional[int] = None): r"""Compute the backward differentiation approximation of the time-derivative of ψ. For order 1: bdf ≡ ψ - psi_star[0] When ``order`` is not specified, uses ``effective_order`` which ramps up from 1 during startup to avoid using higher-order BDF coefficients before distinct history values are available. For order > 1 with variable timesteps, the BDF coefficients are adjusted using the ratio of consecutive timesteps (see ``_bdf_coefficients``). """ if order is None: order = self.effective_order else: order = max(1, min(self.effective_order, order)) coeffs = _bdf_coefficients(order, self._dt, self._dt_history) # Build BDF sum: c0 * psi + c1 * psi_star[0] + c2 * psi_star[1] + ... with sympy.core.evaluate(False): bdf0 = coeffs[0] * self.psi_fn for i in range(1, len(coeffs)): bdf0 = bdf0 + coeffs[i] * self.psi_star[i - 1] return bdf0
[docs] def adams_moulton_flux(self, order: Optional[int] = None): r"""Adams-Moulton flux approximation for implicit time integration. When ``order`` is not specified, uses ``effective_order`` which ramps up from 1 during startup. Parameters ---------- order : int, optional Order of the approximation (1-3). Defaults to ``effective_order``. Returns ------- sympy.Matrix Weighted average of :math:`\psi` and history terms. Notes ----- The Adams-Moulton formulas for order 1-3 are: - Order 1: :math:`\theta \psi + (1-\theta) \psi^*` - Order 2: :math:`\frac{5\psi + 8\psi^* - \psi^{**}}{12}` - Order 3: :math:`\frac{9\psi + 19\psi^* - 5\psi^{**} + \psi^{***}}{24}` """ if order is None: order = self.effective_order else: order = max(1, min(self.effective_order, order)) with sympy.core.evaluate(False): if order == 1: am = self.theta * self.psi_fn + (1.0 - self.theta) * self.psi_star[0] elif order == 2: am = (5 * self.psi_fn + 8 * self.psi_star[0] - self.psi_star[1]) / 12 elif order == 3: am = ( 9 * self.psi_fn + 19 * self.psi_star[0] - 5 * self.psi_star[1] + self.psi_star[2] ) / 24 return am
[docs] class Eulerian(uw_object): r""" Eulerian (mesh-based) history manager for time derivatives. Manages the update of a variable :math:`\psi` on the mesh across timesteps, storing history values on mesh variables for backward differentiation. .. math:: \psi_p^{t-n\Delta t} &\leftarrow \psi_p^{t-(n-1)\Delta t} \\ \psi_p^{t-(n-1)\Delta t} &\leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \\ \psi_p^{t-\Delta t} &\leftarrow \psi_p^{t} When ``V_fn`` is provided, the ``update_pre_solve`` method applies an explicit advection correction so that ``bdf()`` approximates the full material derivative :math:`D\psi/Dt = \partial\psi/\partial t + \mathbf{u}\cdot\nabla\psi` rather than the partial time derivative alone. .. note:: The optional advection capability (V_fn parameter) is suitable for problems where the advection is weak or where a purely grid-based approach is desired (e.g., Richards equation with no transport). For advection-dominated problems, SemiLagrangian is more mature and generally preferred. Parameters ---------- mesh : underworld3.discretisation.Mesh The computational mesh. psi_fn : MeshVariable or sympy.Basic The quantity to track. Can be a mesh variable or symbolic expression. vtype : VarType Variable type (SCALAR, VECTOR, etc.) for history storage. degree : int Polynomial degree for history mesh variables. continuous : bool Whether history variables are continuous across element boundaries. V_fn : sympy.Basic, optional Velocity field for grid-based advection correction. If None (default), computes pure ∂ψ/∂t. If set, computes D/Dt = ∂/∂t + u·∇ via operator splitting. evalf : bool, default=False If True, evaluate expressions numerically during updates. theta : float, default=0.5 Time-stepping parameter for implicit/explicit blending. theta=0 is fully explicit, theta=1 is fully implicit. varsymbol : str, default=r"u" LaTeX symbol for display. verbose : bool, default=False Enable verbose output during updates. bcs : list, default=[] Boundary conditions to apply to projections. order : int, default=1 Number of history timesteps to store (for multi-step methods). smoothing : float, default=0.0 Smoothing parameter for projections. See Also -------- SemiLagrangian : For advection-dominated problems with nodal swarm. Lagrangian : For full Lagrangian tracking on swarms. Symbolic : For purely symbolic history (no mesh storage). """
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, psi_fn: Union[ uw.discretisation.MeshVariable, sympy.Basic ], # sympy function or mesh variable vtype: uw.VarType, degree: int, continuous: bool, V_fn=None, evalf: Optional[bool] = False, theta: Optional[float] = 0.5, varsymbol: Optional[str] = r"u", verbose: Optional[bool] = False, bcs=[], order=1, smoothing=0.0, ): super().__init__() self.mesh = mesh self.V_fn = V_fn self.theta = theta self.bcs = bcs self.verbose = verbose self.degree = degree self.vtype = vtype self.continuous = continuous self.smoothing = smoothing self.evalf = evalf # History tracking: deferred initialization and effective order self._history_initialised = False self._n_solves_completed = 0 # meshVariables are required for: # # u(t) - evaluation of u_fn at the current time # u*(t) - u_* evaluated from # psi is evaluated/stored at `order` timesteps. We can't # be sure if psi is a meshVariable or a function to be evaluated # psi_star is reaching back through each evaluation and has to be a # meshVariable (storage) if isinstance(psi_fn, uw.discretisation.MeshVariable): self._psi_fn = psi_fn.sym ### get symbolic form of the meshvariable self._psi_meshVar = psi_fn else: self._psi_fn = psi_fn ### already in symbolic form self._psi_meshVar = None self.order = order self._dt = None # current timestep (set by solver or update_pre_solve) self._dt_history = [None] * order # previous timesteps for variable-dt BDF psi_star = [] self.psi_star = psi_star for i in range(order): self.psi_star.append( uw.discretisation.MeshVariable( f"psi_star_Eulerian_{self.instance_number}_{i}", self.mesh, vtype=vtype, degree=degree, continuous=continuous, varsymbol=rf"{varsymbol}^{{ {'*'*(i+1)} }}", ) ) return
@property def psi_fn(self): r"""Current symbolic expression :math:`\psi` being tracked.""" return self._psi_fn @psi_fn.setter def psi_fn(self, new_fn): """Set the tracked expression.""" self._psi_fn = new_fn # self._psi_star_projection_solver.uw_function = self.psi_fn return def _object_viewer(self): from IPython.display import Latex, Markdown, display super()._object_viewer() ## feedback on this instance # display(Latex(r"$\quad\psi = $ " + self.psi._repr_latex_())) # display( # Latex( # r"$\quad\Delta t_{\textrm{phys}} = $ " # + sympy.sympify(self.dt_physical)._repr_latex_() # ) # ) display(Latex(rf"$\quad$History steps = {self.order}")) def _setup_projections(self): """Initialize projection solvers for history updates.""" ### using this to store terms that can't be evaluated (e.g. derivatives) # The projection operator for mapping derivative values to the mesh - needs to be different for each variable type, unfortunately ... if self.vtype == uw.VarType.SCALAR: self._psi_star_projection_solver = uw.systems.solvers.SNES_Projection( self.mesh, self.psi_star[0], verbose=False ) elif self.vtype == uw.VarType.VECTOR: self._psi_star_projection_solver = uw.systems.solvers.SNES_Vector_Projection( self.mesh, self.psi_star[0], verbose=False ) elif self.vtype == uw.VarType.SYM_TENSOR or self.vtype == uw.VarType.TENSOR: self._WorkVar = uw.discretisation.MeshVariable( f"W_star_Eulerian_{self.instance_number}", self.mesh, vtype=uw.VarType.SCALAR, degree=self.degree, continuous=self.continuous, varsymbol=r"W^{*}", ) self._psi_star_projection_solver = uw.systems.solvers.SNES_Tensor_Projection( self.mesh, self.psi_star[0], self._WorkVar, verbose=False ) self._psi_star_projection_solver.uw_function = self.psi_fn self._psi_star_projection_solver.bcs = self.bcs self._psi_star_projection_solver.smoothing = self.smoothing @property def effective_order(self): """Current effective BDF order, accounting for history startup. For BDF order k, k distinct history values are needed. During startup, ``effective_order`` ramps from 1 to ``self.order`` as successive solves populate the history slots with distinct values. """ return min(self.order, max(1, self._n_solves_completed + 1))
[docs] def update_history_fn(self): r"""Copy current :math:`\psi` to ``psi_star[0]`` via evaluation or projection.""" ### update first value in history chain ### avoids projecting if function can be evaluated try: try: self.psi_star[0].data[...] = self._psi_meshVar.data[...] except: self.psi_star[0].data[...] = uw.function.evaluate( self.psi_fn, self.psi_star[0].coords, evalf=self.evalf, ).reshape(-1, max(self.psi_fn.shape)) except: self._setup_projections() self._psi_star_projection_solver.solve()
[docs] def initialise_history(self): r"""Initialize all history slots to the current value of :math:`\psi`. Called automatically on the first ``update_pre_solve``. Can also be called manually after setting initial conditions to ensure the history chain starts from the correct state. """ self.update_history_fn() ### set up all history terms to the initial values for i in range(self.order - 1, 0, -1): self.psi_star[i].data[...] = self.psi_star[0].data[...] self._history_initialised = True return
[docs] def initiate_history_fn(self): """Deprecated: use ``initialise_history`` instead.""" self.initialise_history()
[docs] def update( self, dt, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Update history (alias for ``update_pre_solve``).""" self.update_pre_solve(dt, evalf, verbose) return
[docs] def update_pre_solve( self, dt, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Pre-solve: auto-initialise history and apply advection correction. On the first call, automatically initialises history from the current field values. If V_fn is set, also applies an explicit grid-based advection correction so that bdf() approximates the material derivative Dφ/Dt rather than ∂φ/∂t. """ self._dt = dt if not self._history_initialised: self.initialise_history() if self.V_fn is not None and dt is not None: coords = self.psi_star[0].coords dim = self.mesh.dim X = self.mesh.X # Build u·∇φ symbolically for each component of psi_fn # psi_fn is a Matrix; V_fn is also a Matrix. For scalar # psi_fn the shape is (1,1); for vector it is (1,dim). psi = self.psi_fn V = self.V_fn ncomp = max(psi.shape) # number of tracked components for c in range(ncomp): # ∂φ_c/∂x_i for each spatial dimension grad_c = sympy.Matrix([psi[c].diff(X[i]) for i in range(dim)]) # u·∇φ_c = V_i * ∂φ_c/∂x_i advection_expr = sum(V[i] * grad_c[i] for i in range(dim)) advection_vals = uw.function.evaluate( advection_expr, coords, evalf=evalf, ).reshape(-1) self.psi_star[0].data[:, c] -= dt * advection_vals return
[docs] def update_post_solve( self, dt, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): r"""Shift history chain after solve: :math:`\psi^{*n} \leftarrow \psi^{*(n-1)}`.""" self._dt = dt if verbose and uw.mpi.rank == 0: print(f"Update {self.psi_fn}", flush=True) # Record timestep history for variable-dt BDF for i in range(self.order - 1, 0, -1): self._dt_history[i] = self._dt_history[i - 1] self._dt_history[0] = dt ### copy values down the chain for i in range(self.order - 1, 0, -1): self.psi_star[i].data[...] = self.psi_star[i - 1].data[...] ### update the history fn self.update_history_fn() if self._n_solves_completed < self.order: self._n_solves_completed += 1 return
[docs] def bdf(self, order=None): r"""Backwards differentiation form for calculating DuDt. Note that you will need ``bdf`` / :math:`\delta t` in computing derivatives. When ``order`` is not specified, uses ``effective_order`` which ramps up from 1 during startup to avoid using higher-order BDF coefficients before distinct history values are available. For order > 1 with variable timesteps, the BDF coefficients are adjusted using the ratio of consecutive timesteps (see ``_bdf_coefficients``). """ if order is None: order = self.effective_order else: order = max(1, min(self.effective_order, order)) coeffs = _bdf_coefficients(order, self._dt, self._dt_history) # Build BDF sum: c0 * psi + c1 * psi_star[0].sym + ... with sympy.core.evaluate(False): bdf0 = coeffs[0] * self.psi_fn for i in range(1, len(coeffs)): bdf0 = bdf0 + coeffs[i] * self.psi_star[i - 1].sym return bdf0
[docs] def adams_moulton_flux(self, order=None): r"""Adams-Moulton flux approximation for implicit time integration. When ``order`` is not specified, uses ``effective_order`` which ramps up from 1 during startup. Parameters ---------- order : int, optional Order of the approximation (0-3). Defaults to ``effective_order``. Returns ------- sympy.Basic Weighted average of :math:`\psi` and history terms. .. warning:: For order > 1, the coefficients assume uniform timestep spacing. Variable Δt with order > 1 requires modified coefficients. Use order=1 when timestep varies significantly. """ if order is None: order = self.effective_order else: order = max(0, min(self.effective_order, order)) with sympy.core.evaluate(False): if order == 0: am = self.psi_fn elif order == 1: am = self.theta * self.psi_fn + (1.0 - self.theta) * self.psi_star[0].sym elif order == 2: am = (5 * self.psi_fn + 8 * self.psi_star[0].sym - self.psi_star[1].sym) / 12 elif order == 3: am = ( 9 * self.psi_fn + 19 * self.psi_star[0].sym - 5 * self.psi_star[1].sym + self.psi_star[2].sym ) / 24 return am
[docs] class SemiLagrangian(uw_object): r""" Semi-Lagrangian history manager using nodal swarm. Manages the semi-Lagrangian update of a mesh variable :math:`\psi` across timesteps. Uses a nodal swarm to track departure points and interpolate values back to the mesh. .. math:: \psi_p^{t-n\Delta t} &\leftarrow \psi_p^{t-(n-1)\Delta t} \\ \psi_p^{t-(n-1)\Delta t} &\leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \\ \psi_p^{t-\Delta t} &\leftarrow \psi_p^{t} The semi-Lagrangian method traces characteristics backward in time to find departure points, providing stable advection without CFL restrictions while maintaining accuracy. Parameters ---------- mesh : underworld3.discretisation.Mesh The computational mesh. psi_fn : sympy.Function The quantity to advect (typically a mesh variable's symbolic form). V_fn : sympy.Function Velocity field for advection (e.g., ``stokes.u.sym``). vtype : VarType Variable type (SCALAR, VECTOR, SYM_TENSOR, etc.). degree : int Polynomial degree for mesh variable storage. continuous : bool Whether variables are continuous across element boundaries. swarm_degree : int, optional Polynomial degree for swarm interpolation. Defaults to ``degree``. swarm_continuous : bool, optional Continuity for swarm variables. Defaults to ``continuous``. varsymbol : str, optional LaTeX symbol for display. verbose : bool, default=False Enable verbose output during updates. bcs : list, default=[] Boundary conditions for projections. order : int, default=1 Number of history timesteps (1 for first-order, 2 for second-order). smoothing : float, default=0.0 Smoothing parameter for projections. preserve_moments : bool, default=False Use moment-preserving projection (experimental). Notes ----- The semi-Lagrangian method is particularly useful for: - Advection-dominated problems (high Péclet number) - Problems where CFL stability is restrictive - Viscoelastic stress advection See Also -------- Eulerian : For fixed-mesh time derivatives without advection. Lagrangian : For full particle-following Lagrangian tracking. """
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, psi_fn: sympy.Function, V_fn: sympy.Function, vtype: uw.VarType, degree: int, continuous: bool, swarm_degree: Optional[int] = None, swarm_continuous: Optional[bool] = None, varsymbol: Optional[str] = None, verbose: Optional[bool] = False, bcs=[], order=1, smoothing=0.0, preserve_moments=False, ): super().__init__() self.mesh = mesh self.bcs = bcs self.verbose = verbose self.degree = degree self.continuous = continuous self._psi_fn = psi_fn self.V_fn = V_fn self.order = order self.preserve_moments = preserve_moments # History tracking: deferred initialization and effective order self._history_initialised = False self._n_solves_completed = 0 self._dt = None # current timestep (set by solver or update_pre_solve) self._dt_history = [None] * order # previous timesteps for variable-dt BDF if swarm_degree is None: self.swarm_degree = degree else: self.swarm_degree = swarm_degree if swarm_continuous is None: self.swarm_continuous = continuous else: self.swarm_continuous = swarm_continuous if varsymbol is None: varsymbol = rf"u_{{ [{self.instance_number}] }}" # meshVariables are required for: # # u(t) - evaluation of u_fn at the current time # u*(t) - u_* evaluated from # psi is evaluated/stored at `order` timesteps. We can't # be sure if psi is a meshVariable or a function to be evaluated # but psi_star is reaching back through each evaluation and has to be a # meshVariable (storage) psi_star = [] self.psi_star = psi_star # Propagate units from psi_fn to psi_star if the model supports units. # Internal psi_star variables should match the user's variable units when possible, # but if no reference quantities are set, use unitless variables to avoid strict mode errors. psi_units = uw.get_units(psi_fn) # Check if the model can handle units (has reference quantities set) model = uw.get_default_model() if psi_units is not None and not model.has_units(): # Model doesn't have reference quantities - don't propagate units to internal vars psi_units = None for i in range(order): self.psi_star.append( uw.discretisation.MeshVariable( f"psi_star_sl_{self.instance_number}_{i}", self.mesh, vtype=vtype, degree=self.degree, continuous=self.continuous, varsymbol=rf"{{ {varsymbol}^{{ {'*'*(i+1)} }} }}", units=psi_units, # Inherit units from psi_fn (or None if model has no units) ) ) # Working variable that has a potentially different discretisation from psi_star # We project from this to psi_star and we use this variable to define the # advection sample points self._workVar = uw.discretisation.MeshVariable( f"W_{self.instance_number}_{i}", self.mesh, vtype=vtype, degree=self.swarm_degree, continuous=self.swarm_continuous, varsymbol=rf"{{ {varsymbol}^\nabla }}", units=psi_units, # Inherit units from psi_fn ) # We just need one swarm since this is inherently a sequential operation nswarm = uw.swarm.NodalPointSwarm(self._workVar, verbose) self._nswarm_psi = nswarm # The projection operator for mapping swarm values to the mesh - needs to be different for # each variable type, unfortunately ... if vtype == uw.VarType.SCALAR: self._psi_star_projection_solver = uw.systems.solvers.SNES_Projection( self.mesh, self.psi_star[0], verbose=False ) elif vtype == uw.VarType.VECTOR: self._psi_star_projection_solver = uw.systems.solvers.SNES_Vector_Projection( self.mesh, self.psi_star[0], verbose=False, ) elif vtype == uw.VarType.SYM_TENSOR or vtype == uw.VarType.TENSOR: self._WorkVarTP = uw.discretisation.MeshVariable( f"W_star_slcn_{self.instance_number}", self.mesh, vtype=uw.VarType.SCALAR, degree=degree, continuous=continuous, varsymbol=r"W^{*}", ) self._psi_star_projection_solver = uw.systems.solvers.SNES_Tensor_Projection( self.mesh, self.psi_star[0], self._WorkVarTP, verbose=False ) # We should find a way to add natural bcs here # (self.Unknowns.u carried as a symbol from solver to solver) self._psi_star_projection_solver.uw_function = self._workVar.sym self._psi_star_projection_solver.bcs = bcs self._psi_star_projection_solver.smoothing = smoothing self._smoothing = smoothing self.I = uw.maths.Integral(mesh, None) return
@property def psi_fn(self): r"""Current symbolic expression :math:`\psi` being tracked.""" return self._psi_fn @psi_fn.setter def psi_fn(self, new_fn): """Set the tracked expression.""" self._psi_fn = new_fn self._psi_star_projection_solver.uw_function = self._psi_fn return def _object_viewer(self): from IPython.display import Latex, Markdown, display super()._object_viewer() display(Latex(rf"$\quad$History steps = {self.order}")) @property def effective_order(self): """Current effective BDF order, accounting for history startup. For BDF order k, k distinct history values are needed. During startup, ``effective_order`` ramps from 1 to ``self.order`` as successive solves populate the history slots with distinct values. """ return min(self.order, max(1, self._n_solves_completed + 1))
[docs] def initialise_history(self): r"""Initialize all history slots to the current value of :math:`\psi`. Called automatically on the first ``update_pre_solve``. Can also be called manually after setting initial conditions. """ # Evaluate psi_fn at psi_star node positions and store in psi_star[0] from underworld3.utilities.unit_aware_array import UnitAwareArray coords = self.psi_star[0].coords if hasattr(coords, "magnitude"): coords_nd = uw.non_dimensionalise(coords) if isinstance(coords_nd, UnitAwareArray): coords_nd = np.array(coords_nd) elif hasattr(coords_nd, 'magnitude'): coords_nd = coords_nd.magnitude else: coords_nd = coords try: eval_result = uw.function.evaluate(self.psi_fn, coords_nd) psi_units = self.psi_star[0].units if psi_units is not None and not isinstance(eval_result, UnitAwareArray): eval_result = UnitAwareArray(eval_result, units=psi_units) self.psi_star[0].array[...] = eval_result except Exception: self._psi_star_projection_solver.uw_function = self.psi_fn self._psi_star_projection_solver.smoothing = 0.0 self._psi_star_projection_solver.solve() # Copy to all other history slots for i in range(1, self.order): self.psi_star[i].array[...] = self.psi_star[0].array[...] self._history_initialised = True return
[docs] def initiate_history_fn(self): """Deprecated: use ``initialise_history`` instead.""" self.initialise_history()
[docs] def update( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, dt_physical: Optional = None, ): """Update history (alias for ``update_pre_solve``).""" self.update_pre_solve(dt, evalf, verbose, dt_physical) return
[docs] def update_post_solve( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, dt_physical: Optional[float] = None, ): """Post-solve: record timestep and increment solve counter.""" self._dt = dt # Record timestep history for variable-dt BDF for i in range(self.order - 1, 0, -1): self._dt_history[i] = self._dt_history[i - 1] self._dt_history[0] = dt if self._n_solves_completed < self.order: self._n_solves_completed += 1 return
[docs] def update_pre_solve( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, dt_physical: Optional[float] = None, ): """Sample upstream values along characteristics before solve. On the first call, automatically initialises history from the current field values so that bdf() returns zero on the first step. """ if not self._history_initialised: self.initialise_history() ## Progress from the oldest part of the history # 1. Copy the stored values down the chain in preparation for the next timestep # The history term is the nodel value of psi_fn offset back along the characteristics # according to the timestep. # That is: # # - psi_star[0] is the current value of psi_fn, sampled # at the location of the nodes in their previous position at t-\Delta t # # - psi_star[1] is the value of psi_star[0] from the previous timestep # sampled at the location of the nodes at t - \Delta t. (note this is approximately # equivalent to the value of psi_star[0] at t - 2\Delta t) # # - psi_star[2] etc if required ... # # First we copy the history, then we sample can sample upstream values if dt_physical is not None: phi = sympy.Min(1, dt / dt_physical) else: phi = sympy.sympify(1) for i in range(self.order - 1, 0, -1): self.psi_star[i].array[...] = ( phi * self.psi_star[i - 1].array[...] + (1 - phi) * self.psi_star[i].array[...] ) # 2. Compute the current value of psi_fn which we store in psi_star[0] # Note the need to do a try/except to handle unsupported evaluations # (e.g. of derivatives) # # CRITICAL FIX (2025-11-28): Handle coordinates correctly for unit-aware mode. # Previous bug: extracting .magnitude gives METERS (e.g., 1000000), but: # - mesh.get_closest_cells() expects [0-1] non-dimensional coords # - evaluate() assumes plain numpy is [0-1] non-dimensional # Solution: use uw.non_dimensionalise() for proper conversion, OR pass # unit-aware coords to evaluate() which handles conversion internally. from underworld3.utilities.unit_aware_array import UnitAwareArray psi_star_0_coords = self.psi_star[0].coords # For mesh internal operations, need non-dimensional [0-1] coordinates if hasattr(psi_star_0_coords, "magnitude"): # Unit-aware coords - need to non-dimensionalize (not just extract magnitude!) psi_star_0_coords_nd = uw.non_dimensionalise(psi_star_0_coords) # Extract to plain numpy for mesh operations if isinstance(psi_star_0_coords_nd, UnitAwareArray): psi_star_0_coords_nd = np.array(psi_star_0_coords_nd) elif hasattr(psi_star_0_coords_nd, 'magnitude'): psi_star_0_coords_nd = psi_star_0_coords_nd.magnitude else: psi_star_0_coords_nd = np.array(psi_star_0_coords_nd) else: # Plain numpy - assume already non-dimensional psi_star_0_coords_nd = psi_star_0_coords cellid = self.mesh.get_closest_cells( psi_star_0_coords_nd, ) # Move slightly within the chosen cell to avoid edge effects centroid_coords = self.mesh._centroids[cellid] shift = 0.001 node_coords_nd = (1.0 - shift) * psi_star_0_coords_nd[:, :] + shift * centroid_coords[ :, : ] try: # Use shifted ND coords to avoid quad mesh boundary issues # node_coords_nd is slightly shifted toward cell centroids (lines 703-709) # evaluate() treats plain numpy as ND [0-1] coordinates eval_result = uw.function.evaluate( self.psi_fn, node_coords_nd, evalf=evalf, ) # Wrap result with units if psi_star has units but eval didn't return UnitAwareArray psi_star_units = self.psi_star[0].units if psi_star_units is not None and not isinstance(eval_result, UnitAwareArray): eval_result = UnitAwareArray(eval_result, units=psi_star_units) self.psi_star[0].array[...] = eval_result except Exception: # Fallback to projection solver for expressions that can't be directly evaluated # (e.g., containing derivatives) self._psi_star_projection_solver.uw_function = self.psi_fn self._psi_star_projection_solver.smoothing = 0.0 self._psi_star_projection_solver.solve(verbose=verbose) # 3. Compute the upstream values from the psi_fn # We use the u_star variable as a working value here so we have to work backwards # so we don't over-write the history terms # # Convert dt to model units for numerical arithmetic # (after symbolic logic that may use dt with units) # Note: uw is already imported at module level (line 7) model = uw.get_default_model() # DIAGNOSTIC: Capture information about the unit system coords_template = self.psi_star[0].coords has_units = hasattr(coords_template, "magnitude") or hasattr(coords_template, "_magnitude") # Maintain unit system consistency: either keep everything with units or convert to non-dimensional if has_units: # Physical coordinate system with units # dt must be converted to base SI seconds so that dt * velocity(m/s) = distance(m) if hasattr(dt, "to"): # It's a Pint quantity dt_for_calc = dt.to("second") # Convert to seconds (still a quantity) else: # If dt is already a dimensionless number, treat it as seconds dt_for_calc = dt else: # Non-dimensional coordinate system - convert dt to non-dimensional # CRITICAL: Actually non-dimensionalize the timestep! if hasattr(dt, "magnitude") or hasattr(dt, "value"): # dt has units - non-dimensionalize it dt_nondim = uw.non_dimensionalise(dt, model) # Extract the dimensionless value if hasattr(dt_nondim, "magnitude"): dt_for_calc = float(dt_nondim.magnitude) elif hasattr(dt_nondim, "value"): dt_for_calc = float(dt_nondim.value) else: dt_for_calc = float(dt_nondim) else: # Already dimensionless dt_for_calc = dt for i in range(self.order - 1, -1, -1): # 2nd order update along characteristics # Use shifted ND coords to avoid quad mesh boundary issues # node_coords_nd is slightly shifted toward cell centroids (lines 703-709) # evaluate() treats plain numpy as ND [0-1] coordinates v_result = uw.function.evaluate( self.V_fn, node_coords_nd, ) # CRITICAL: Preserve UnitAwareArray through slicing # Slicing can sometimes return plain numpy views - need to preserve wrapper from underworld3.utilities.unit_aware_array import UnitAwareArray if isinstance(v_result, UnitAwareArray): # Slice and rewrap to preserve units v_at_node_pts = v_result[:, 0, :] if not isinstance(v_at_node_pts, UnitAwareArray): # Slicing lost the wrapper - rewrap it v_at_node_pts = UnitAwareArray(v_at_node_pts, units=v_result.units) else: v_at_node_pts = v_result[:, 0, :] # Non-dimensionalize velocities when working with dimensionless coordinates # This prevents dimensional mismatch: velocities in m/s mixed with coords in [0,1] # CRITICAL: evaluate now returns UnitAwareArray with units attached # Check if velocities already have units before trying to add them manually if not has_units: # Coordinates are dimensionless - need to non-dimensionalize velocities too if isinstance(v_at_node_pts, UnitAwareArray): # Velocities already have units from evaluate - just non-dimensionalize v_nondim = uw.non_dimensionalise(v_at_node_pts, model) # Extract numpy array for dimensionless calculation if isinstance(v_nondim, UnitAwareArray): v_at_node_pts = np.array(v_nondim) elif hasattr(v_nondim, "value"): v_at_node_pts = v_nondim.value else: v_at_node_pts = v_nondim else: # Velocities don't have units - try to add them manually (legacy path) v_units = uw.get_units(self.V_fn) if v_units and v_units != "dimensionless": v_with_units = UnitAwareArray(v_at_node_pts, units=v_units) v_nondim = uw.non_dimensionalise(v_with_units, model) if isinstance(v_nondim, UnitAwareArray): v_at_node_pts = np.array(v_nondim) elif hasattr(v_nondim, "value"): v_at_node_pts = v_nondim.value else: v_at_node_pts = v_nondim else: # Dimensional mode - ensure velocities have units # CRITICAL FIX (2025-11-27): Variable data is stored NON-DIMENSIONALLY. # We must DIMENSIONALIZE (not just wrap) the values before dimensional arithmetic. # Previous bug: wrapping 0.01 (ND) with cm/yr gave 0.01 cm/yr instead of 1 cm/yr. if not isinstance(v_at_node_pts, UnitAwareArray): v_units = uw.get_units(self.V_fn) if v_units and v_units != "dimensionless": # Re-dimensionalize using the scaling system if uw.is_nondimensional_scaling_active(): from underworld3.scaling import dimensionalise # dimensionalise(nd_value, units) -> value * scale in those units v_dimensional = dimensionalise(v_at_node_pts, v_units) v_at_node_pts = UnitAwareArray(v_dimensional.magnitude, units=v_dimensional.units) else: # No scaling active - assume values are already dimensional v_at_node_pts = UnitAwareArray(v_at_node_pts, units=v_units) # Get coordinates coords = self.psi_star[i].coords # CRITICAL: When working in dimensionless mode, extract coords to plain arrays # to match the dimensionless velocities (otherwise unit mismatch occurs) from underworld3.utilities.unit_aware_array import UnitAwareArray if not has_units and isinstance(coords, UnitAwareArray): # Extract to plain numpy for dimensionless arithmetic coords = np.array(coords) # CRITICAL (2025-11-27): Multiply velocity FIRST so UnitAwareArray.__mul__ handles it. # If we do `dt_for_calc * v_at_node_pts`, Pint handles it and loses UnitAwareArray units. mid_pt_coords = coords - v_at_node_pts * (0.5 * dt_for_calc) v_mid_result = uw.function.global_evaluate( self.V_fn, mid_pt_coords, ) # CRITICAL: Preserve UnitAwareArray through slicing if isinstance(v_mid_result, UnitAwareArray): # Slice and rewrap to preserve units v_at_mid_pts = v_mid_result[:, 0, :] if not isinstance(v_at_mid_pts, UnitAwareArray): # Slicing lost the wrapper - rewrap it v_at_mid_pts = UnitAwareArray(v_at_mid_pts, units=v_mid_result.units) else: v_at_mid_pts = v_mid_result[:, 0, :] # Non-dimensionalize mid-point velocities when working with dimensionless coordinates # CRITICAL: global_evaluate now returns UnitAwareArray with units attached # Check if velocities already have units before trying to add them manually if not has_units: # Coordinates are dimensionless - need to non-dimensionalize velocities too if isinstance(v_at_mid_pts, UnitAwareArray): # Velocities already have units from global_evaluate - just non-dimensionalize v_nondim = uw.non_dimensionalise(v_at_mid_pts, model) # Extract numpy array for dimensionless calculation if isinstance(v_nondim, UnitAwareArray): v_at_mid_pts = np.array(v_nondim) elif hasattr(v_nondim, "value"): v_at_mid_pts = v_nondim.value else: v_at_mid_pts = v_nondim else: # Velocities don't have units - try to add them manually (legacy path) v_units = uw.get_units(self.V_fn) if v_units and v_units != "dimensionless": v_with_units = UnitAwareArray(v_at_mid_pts, units=v_units) v_nondim = uw.non_dimensionalise(v_with_units, model) if isinstance(v_nondim, UnitAwareArray): v_at_mid_pts = np.array(v_nondim) elif hasattr(v_nondim, "value"): v_at_mid_pts = v_nondim.value else: v_at_mid_pts = v_nondim else: # Dimensional mode - ensure velocities have units # CRITICAL: If V_fn doesn't have unit metadata, evaluate() returns plain numpy # We need to manually wrap it with units for dimensional arithmetic to work if not isinstance(v_at_mid_pts, UnitAwareArray): v_units = uw.get_units(self.V_fn) if v_units and v_units != "dimensionless": # Wrap velocities with their proper units v_at_mid_pts = UnitAwareArray(v_at_mid_pts, units=v_units) # Calculate upstream coordinates: current position - velocity * timestep end_pt_coords = coords - v_at_mid_pts * dt_for_calc # Extract scalar from (1,1) Matrix for scalar variables # MeshVariable.sym returns Matrix([[value]]) for scalars expr_to_evaluate = self.psi_star[i].sym if hasattr(expr_to_evaluate, 'shape') and expr_to_evaluate.shape == (1, 1): expr_to_evaluate = expr_to_evaluate[0, 0] # Evaluate psi_star at upstream coordinates # global_evaluate now returns dimensional results (gateway fix 2025-11-28) value_at_end_points = uw.function.global_evaluate( expr_to_evaluate, end_pt_coords, ) # CRITICAL FIX (2025-11-27): If psi_star has units, ensure the assigned # value also has units. global_evaluate may return plain arrays. psi_star_units = self.psi_star[i].units if psi_star_units is not None and not isinstance(value_at_end_points, UnitAwareArray): value_at_end_points = UnitAwareArray(value_at_end_points, units=psi_star_units) self.psi_star[i].array[...] = value_at_end_points # disable this for now - Compute moments before update if 0 and self.preserve_moments and self._workVar.num_components == 1: self.I.fn = self.psi_star[i].sym[0] Imean0 = self.I.evaluate() self.I.fn = (self.psi_star[i].sym[0] - Imean0) ** 2 IL20 = np.sqrt(self.I.evaluate()) # disable this for now - Restore moments after update if 0 and self.preserve_moments and self._workVar.num_components == 1: self.I.fn = self.psi_star[i].sym[0] Imean = self.I.evaluate() self.I.fn = (self.psi_star[i].sym[0] - Imean) ** 2 IL2 = np.sqrt(self.I.evaluate()) # TODO: DELETE remove swarm.access / data, replace with direct array assignment # with self.mesh.access(self.psi_star[i]): # self.psi_star[i].data[...] += Imean0 - Imean self.psi_star[i].array[...] += Imean0 - Imean self.I.fn = (self.psi_star[i].sym[0] - Imean0) ** 2 IL2 = np.sqrt(self.I.evaluate()) # TODO: DELETE remove swarm.access / data, replace with direct array assignment # with self.mesh.access(self.psi_star[i]): # self.psi_star[i].data[...] = ( # self.psi_star[i].data[...] - Imean0 # ) * IL20 / IL2 + Imean0 self.psi_star[i].array[...] = ( self.psi_star[i].array[...] - Imean0 ) * IL20 / IL2 + Imean0 return
[docs] def bdf(self, order=None): r"""Backwards differentiation form for calculating DuDt. Note that you will need ``bdf`` / :math:`\delta t` in computing derivatives. When ``order`` is not specified, uses ``effective_order`` which ramps up from 1 during startup. For order > 1 with variable timesteps, coefficients are adjusted automatically. """ if order is None: order = self.effective_order else: order = max(1, min(self.effective_order, order)) coeffs = _bdf_coefficients(order, self._dt, self._dt_history) # Build BDF sum: c0 * psi + c1 * psi_star[0].sym + ... with sympy.core.evaluate(True): bdf0 = coeffs[0] * self.psi_fn for i in range(1, len(coeffs)): bdf0 = bdf0 + coeffs[i] * self.psi_star[i - 1].sym return bdf0
[docs] def adams_moulton_flux(self, order=None): r"""Adams-Moulton flux approximation for implicit time integration. When ``order`` is not specified, uses ``effective_order`` which ramps up from 1 during startup. Parameters ---------- order : int, optional Order of the approximation (0-3). Defaults to ``effective_order``. Returns ------- sympy.Basic Weighted average of :math:`\psi` and history terms. """ if order is None: order = self.effective_order else: order = max(0, min(self.effective_order, order)) with sympy.core.evaluate(True): if order == 0: am = self.psi_fn elif order == 1: am = (self.psi_fn + self.psi_star[0].sym) / 2 elif order == 2: am = (5 * self.psi_fn + 8 * self.psi_star[0].sym - self.psi_star[1].sym) / 12 elif order == 3: am = ( 9 * self.psi_fn + 19 * self.psi_star[0].sym - 5 * self.psi_star[1].sym + self.psi_star[2].sym ) / 24 return am
## Consider Deprecating this one - it is the same as the Lagrangian_Swarm but ## sets up the swarm for itself. This does not have a practical use-case - the swarm version ## is slower, more cumbersome, and less stable / accurate. The only reason to use ## it is if there is an existing swarm that we can re-purpose.
[docs] class Lagrangian(uw_object): r""" Swarm-based Lagrangian history manager for material tracking. Manages the update of a Lagrangian variable :math:`\psi` on a swarm across timesteps. Creates and manages its own internal swarm for tracking material properties through the flow. .. math:: \psi_p^{t-n\Delta t} \leftarrow \psi_p^{t-(n-1)\Delta t} \psi_p^{t-(n-1)\Delta t} \leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \psi_p^{t-\Delta t} \leftarrow \psi_p^{t} The Lagrangian approach follows material points through the flow, avoiding numerical diffusion in advection. History values are stored on swarm variables with proxy mesh variables for solver integration. Parameters ---------- mesh : underworld3.discretisation.Mesh The computational mesh. psi_fn : sympy.Function The quantity to track (e.g., stress tensor ``stokes.stress``). V_fn : sympy.Function Velocity field for particle advection. vtype : VarType Variable type (SCALAR, VECTOR, SYM_TENSOR, etc.). degree : int Polynomial degree for proxy mesh variables. continuous : bool Whether proxy variables are continuous across elements. varsymbol : str, default=r"u" LaTeX symbol for display. verbose : bool, default=False Enable verbose output during updates. bcs : list, default=[] Boundary conditions (currently unused for swarm variables). order : int, default=1 Number of history timesteps to store. smoothing : float, default=0.0 Smoothing parameter for projections. fill_param : int, default=3 Fill parameter for swarm population density. Notes ----- The Lagrangian method is ideal for: - Viscoelastic stress history tracking - Material property advection without diffusion - Tracking compositional fields The internal swarm is automatically advected during updates. See Also -------- SemiLagrangian : Mesh-based semi-Lagrangian with departure points. Lagrangian_Swarm : For user-provided swarms. """ instances = ( 0 # count how many of these there are in order to create unique private mesh variable ids )
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, psi_fn: sympy.Function, V_fn: sympy.Function, vtype: uw.VarType, degree: int, continuous: bool, varsymbol: Optional[str] = r"u", verbose: Optional[bool] = False, bcs=[], order=1, smoothing=0.0, fill_param=3, ): super().__init__() # create a new swarm to manage here dudt_swarm = uw.swarm.UWSwarm(mesh) self.mesh = mesh self.swarm = dudt_swarm self.psi_fn = psi_fn self.V_fn = V_fn self.verbose = verbose self.order = order # History tracking: deferred initialization and effective order self._history_initialised = False self._n_solves_completed = 0 self._dt = None # current timestep (set by solver or update_pre_solve) self._dt_history = [None] * order # previous timesteps for variable-dt BDF psi_star = [] self.psi_star = psi_star for i in range(order): self.psi_star.append( uw.swarm.SwarmVariable( f"psi_star_sw_{self.instance_number}_{i}", self.swarm, vtype=vtype, proxy_degree=degree, proxy_continuous=continuous, varsymbol=rf"{varsymbol}^{{ {'*'*(i+1)} }}", ) ) dudt_swarm.populate(fill_param) return
def _object_viewer(self): from IPython.display import Latex, Markdown, display super()._object_viewer() ## feedback on this instance display(Latex(r"$\quad\psi = $ " + self.psi._repr_latex_())) display( Latex( r"$\quad\Delta t_{\textrm{phys}} = $ " + sympy.sympify(self.dt_physical)._repr_latex_() ) ) display(Latex(rf"$\quad$History steps = {self.order}")) @property def effective_order(self): """Current effective BDF order, accounting for history startup.""" return min(self.order, max(1, self._n_solves_completed + 1))
[docs] def initialise_history(self): r"""Initialize all history slots to the current value of :math:`\psi`. Called automatically on the first ``update_pre_solve``. Can also be called manually after setting initial conditions. """ psi_star_0 = self.psi_star[0] with self.swarm.access(psi_star_0): for i in range(psi_star_0.shape[0]): for j in range(psi_star_0.shape[1]): updated_psi = uw.function.evaluate( self.psi_fn[i, j], self.swarm.data, ) psi_star_0[i, j].data[:] = updated_psi # Copy to all other history slots for k in range(1, self.order): with self.swarm.access(self.psi_star[k]): self.psi_star[k].data[...] = psi_star_0.data[...] self._history_initialised = True return
[docs] def initiate_history_fn(self): """Deprecated: use ``initialise_history`` instead.""" self.initialise_history()
## Note: We may be able to eliminate this ## The SL updater and the Lag updater have ## different sequencing because of the way they ## update the history. It makes more sense for the ## full Lagrangian swarm to be updated after the solve ## and this means we have to grab the history values first.
[docs] def update( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Update history (alias for ``update_post_solve``).""" self.update_post_solve(dt, evalf, verbose) return
[docs] def update_pre_solve( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Pre-solve: auto-initialise history on first call.""" if not self._history_initialised: self.initialise_history() return
[docs] def update_post_solve( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Shift history chain and advect swarm after solve.""" self._dt = dt # Record timestep history for variable-dt BDF for i in range(self.order - 1, 0, -1): self._dt_history[i] = self._dt_history[i - 1] self._dt_history[0] = dt for h in range(self.order - 1): i = self.order - (h + 1) # copy the information down the chain if verbose: print(f"Lagrange order = {self.order}") print(f"Lagrange copying {i-1} to {i}") self.psi_star[i].array[...] = self.psi_star[i - 1].array[...] # Now update the swarm variable psi_star_0 = self.psi_star[0] with self.swarm.access(psi_star_0): for i in range(psi_star_0.shape[0]): for j in range(psi_star_0.shape[1]): updated_psi = uw.function.evaluate( self.psi_fn[i, j], self.swarm.data, evalf=evalf, ) psi_star_0[i, j].data[:] = updated_psi # Now update the swarm locations self.swarm.advection( self.V_fn, delta_t=dt, restore_points_to_domain_func=self.mesh.return_coords_to_bounds, ) if self._n_solves_completed < self.order: self._n_solves_completed += 1
[docs] def bdf(self, order=None): r"""Backwards differentiation form for calculating DuDt. Note that you will need ``bdf`` / :math:`\delta t` in computing derivatives. When ``order`` is not specified, uses ``effective_order`` which ramps up from 1 during startup. For order > 1 with variable timesteps, coefficients are adjusted automatically. """ if order is None: order = self.effective_order else: order = max(1, min(self.effective_order, order)) coeffs = _bdf_coefficients(order, self._dt, self._dt_history) with sympy.core.evaluate(True): bdf0 = coeffs[0] * self.psi_fn for i in range(1, len(coeffs)): bdf0 = bdf0 + coeffs[i] * self.psi_star[i - 1].sym return bdf0
[docs] def adams_moulton_flux(self, order=None): r"""Adams-Moulton flux approximation for implicit time integration. When ``order`` is not specified, uses ``effective_order`` which ramps up from 1 during startup. Parameters ---------- order : int, optional Order of the approximation (0-3). Defaults to ``effective_order``. Returns ------- sympy.Basic Weighted average of :math:`\psi` and history terms. """ if order is None: order = self.effective_order else: order = max(1, min(self.effective_order, order)) with sympy.core.evaluate(True): if order == 0: # Special case - no history term am = self.psi_fn elif order == 1: am = (self.psi_fn + self.psi_star[0].sym) / 2 elif order == 2: am = (5 * self.psi_fn + 8 * self.psi_star[0].sym - self.psi_star[1].sym) / 12 elif order == 3: am = ( 9 * self.psi_fn + 19 * self.psi_star[0].sym - 5 * self.psi_star[1].sym + self.psi_star[2].sym ) / 24 return am
[docs] class Lagrangian_Swarm(uw_object): r""" Swarm-based Lagrangian history manager (user-provided swarm). Manages the update of a Lagrangian variable :math:`\psi` on a user-supplied swarm across timesteps: .. math:: \psi_p^{t-n\Delta t} \leftarrow \psi_p^{t-(n-1)\Delta t} \psi_p^{t-(n-1)\Delta t} \leftarrow \psi_p^{t-(n-2)\Delta t} \cdots \psi_p^{t-\Delta t} \leftarrow \psi_p^{t} Unlike ``Lagrangian``, this class uses a user-provided swarm rather than creating its own. The swarm should already be populated and configured for tracking material points. Parameters ---------- swarm : underworld3.swarm.Swarm User-provided swarm for material point tracking. psi_fn : sympy.Function The quantity to track (e.g., stress tensor from a solver). vtype : underworld3.VarType Variable type (SCALAR, VECTOR, SYM_TENSOR, etc.). degree : int Interpolation degree for proxy mesh variables. continuous : bool Whether proxy mesh variables should be continuous. varsymbol : str, optional LaTeX symbol for display (default ``"u"``). verbose : bool, optional Enable verbose output (default ``False``). bcs : list, optional Boundary conditions (currently unused, default ``[]``). order : int, optional Order of time integration (1-3) (default ``1``). smoothing : float, optional Smoothing parameter (default ``0.0``). step_averaging : int, optional Number of steps for history averaging (default ``2``). Attributes ---------- mesh : underworld3.discretisation.Mesh Reference to the computational mesh (from swarm). swarm : underworld3.swarm.Swarm The user-provided swarm. psi_fn : sympy.Function Symbolic expression for the tracked quantity. order : int Order of BDF integration. step_averaging : int Number of steps for averaging (affects BDF scaling). psi_star : list History values :math:`\psi^*, \psi^{**}, \ldots` as swarm variables. Notes ----- Key differences from ``Lagrangian`` class: - Uses user-provided swarm (not internally created) - Swarm advection is NOT performed (user's responsibility) - Step averaging for smoothing history updates - Suitable when swarm is shared between multiple history managers The ``step_averaging`` parameter scales the BDF formula to account for updates that occur over multiple sub-steps within a timestep. See Also -------- Lagrangian : Creates and manages its own swarm with advection. SemiLagrangian : Nodal-swarm approach for advection-dominated problems. Eulerian : Pure mesh-based history (no particle tracking). """ instances = ( 0 # count how many of these there are in order to create unique private mesh variable ids )
[docs] @timing.routine_timer_decorator def __init__( self, swarm: uw.swarm.Swarm, psi_fn: sympy.Function, vtype: uw.VarType, degree: int, continuous: bool, varsymbol: Optional[str] = r"u", verbose: Optional[bool] = False, bcs=[], order=1, smoothing=0.0, step_averaging=2, ): super().__init__() self.mesh = swarm.mesh self.swarm = swarm self.psi_fn = psi_fn self.verbose = verbose self.order = order self.step_averaging = step_averaging # History tracking: deferred initialization and effective order self._history_initialised = False self._n_solves_completed = 0 self._dt = None # current timestep (set by solver or update_pre_solve) self._dt_history = [None] * order # previous timesteps for variable-dt BDF psi_star = [] self.psi_star = psi_star for i in range(order): self.psi_star.append( uw.swarm.SwarmVariable( f"psi_star_sw_{self.instance_number}_{i}", self.swarm, vtype=vtype, proxy_degree=degree, proxy_continuous=continuous, varsymbol=rf"{varsymbol}^{{ {'*'*(i+1)} }}", ) ) return
def _object_viewer(self): from IPython.display import Latex, Markdown, display super()._object_viewer() ## feedback on this instance display(Latex(r"$\quad\psi = $ " + self.psi._repr_latex_())) display( Latex( r"$\quad\Delta t_{\textrm{phys}} = $ " + sympy.sympify(self.dt_physical)._repr_latex_() ) ) display(Latex(rf"$\quad$History steps = {self.order}")) @property def effective_order(self): """Current effective BDF order, accounting for history startup.""" return min(self.order, max(1, self._n_solves_completed + 1))
[docs] def initialise_history(self): r"""Initialize all history slots to the current value of :math:`\psi`. Called automatically on the first ``update_pre_solve``. Can also be called manually after setting initial conditions. """ psi_star_0 = self.psi_star[0] with self.swarm.access(psi_star_0): for i in range(psi_star_0.shape[0]): for j in range(psi_star_0.shape[1]): updated_psi = uw.function.evaluate( self.psi_fn[i, j], self.swarm.data, ) psi_star_0[i, j].data[:] = updated_psi # Copy to all other history slots for k in range(1, self.order): with self.swarm.access(self.psi_star[k]): self.psi_star[k].data[...] = psi_star_0.data[...] self._history_initialised = True return
[docs] def initiate_history_fn(self): """Deprecated: use ``initialise_history`` instead.""" self.initialise_history()
## Note: We may be able to eliminate this ## The SL updater and the Lag updater have ## different sequencing because of the way they ## update the history. It makes more sense for the ## full Lagrangian swarm to be updated after the solve ## and this means we have to grab the history values first.
[docs] def update( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Update history (alias for ``update_post_solve``).""" self.update_post_solve(dt, evalf, verbose) return
[docs] def update_pre_solve( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): """Pre-solve: auto-initialise history on first call.""" if not self._history_initialised: self.initialise_history() return
[docs] def update_post_solve( self, dt: float, evalf: Optional[bool] = False, verbose: Optional[bool] = False, ): r"""Shift history chain and evaluate current :math:`\psi` on swarm.""" self._dt = dt # Record timestep history for variable-dt BDF for i in range(self.order - 1, 0, -1): self._dt_history[i] = self._dt_history[i - 1] self._dt_history[0] = dt for h in range(self.order - 1): i = self.order - (h + 1) # copy the information down the chain if verbose: print(f"Lagrange swarm order = {self.order}", flush=True) print( f"Mesh interpolant order = {self.psi_star[0]._meshVar.degree}", flush=True, ) print(f"Lagrange swarm copying {i-1} to {i}", flush=True) with self.swarm.access(self.psi_star[i]): self.psi_star[i].data[...] = self.psi_star[i - 1].data[...] phi = 1 / self.step_averaging psi_star_0 = self.psi_star[0] with self.swarm.access(psi_star_0): for i in range(psi_star_0.shape[0]): for j in range(psi_star_0.shape[1]): updated_psi = uw.function.evaluate( self.psi_fn[i, j], self.swarm.data, evalf=evalf, ) psi_star_0[i, j].data[:] = ( phi * updated_psi + (1 - phi) * psi_star_0[i, j].data[:] ) if self._n_solves_completed < self.order: self._n_solves_completed += 1 return
[docs] def bdf(self, order=None): r"""Backwards differentiation form for calculating DuDt. Note that you will need ``bdf`` / :math:`\delta t` in computing derivatives. When ``order`` is not specified, uses ``effective_order`` which ramps up from 1 during startup. For order > 1 with variable timesteps, coefficients are adjusted automatically. """ if order is None: order = self.effective_order else: order = max(1, min(self.effective_order, order)) coeffs = _bdf_coefficients(order, self._dt, self._dt_history) with sympy.core.evaluate(False): bdf0 = coeffs[0] * self.psi_fn for i in range(1, len(coeffs)): bdf0 = bdf0 + coeffs[i] * self.psi_star[i - 1].sym bdf0 /= self.step_averaging # This is actually calculated over several steps so scaling is required return bdf0
[docs] def adams_moulton_flux(self, order=None): r"""Adams-Moulton flux approximation for implicit time integration. When ``order`` is not specified, uses ``effective_order`` which ramps up from 1 during startup. Parameters ---------- order : int, optional Order of the approximation (1-3). Defaults to ``effective_order``. Returns ------- sympy.Basic Weighted average of :math:`\psi` and history terms. """ if order is None: order = self.effective_order else: order = max(1, min(self.effective_order, order)) with sympy.core.evaluate(False): if order == 1: am = (self.psi_fn + self.psi_star[0].sym) / 2 elif order == 2: am = (5 * self.psi_fn + 8 * self.psi_star[0].sym - self.psi_star[1].sym) / 12 elif order == 3: am = ( 9 * self.psi_fn + 19 * self.psi_star[0].sym - 5 * self.psi_star[1].sym + self.psi_star[2].sym ) / 24 return am