Source code for underworld3.systems.solvers

r"""
PDE solvers for Underworld3.

This module provides finite element solvers for common PDEs in geodynamics
and computational mechanics. All solvers use PETSc's SNES (Scalable Nonlinear
Equation Solvers) framework.

Scalar Equations
----------------
SNES_Poisson
    Poisson/diffusion equation: :math:`\nabla \cdot (k \nabla T) = f`
SNES_Darcy
    Darcy flow: pressure-driven flow through porous media
SNES_TransientDarcy
    Transient groundwater flow with constant storage
SNES_Richards
    Richards equation for variably-saturated porous media flow

Vector Equations
----------------
SNES_Stokes
    Stokes flow: :math:`\nabla \cdot \boldsymbol{\tau} - \nabla p = f`
SNES_VE_Stokes
    Viscoelastic Stokes with stress history
SNES_Stokes_SaddlePt
    Stokes with saddle-point preconditioner

Transport Equations
-------------------
SNES_AdvectionDiffusion
    Scalar advection-diffusion with Lagrangian tracking

Notes
-----
Solvers support unit-aware calculations when reference quantities are
set on the model. Physical units are preserved through the solve process
and timestep estimates are returned with appropriate time units.

See Also
--------
underworld3.systems.ddt : Time derivative schemes for transient problems.
underworld3.constitutive_models : Material constitutive laws.

Examples
--------
Set up a Stokes solver:

>>> stokes = uw.systems.Stokes(mesh, velocityField=v, pressureField=p)
>>> stokes.constitutive_model = uw.constitutive_models.ViscousFlowModel
>>> stokes.viscosity = 1e21
>>> stokes.solve()
"""

import sympy
from sympy import sympify
import numpy as np

from typing import Optional, Callable, Union

import underworld3 as uw
from underworld3.systems import SNES_Scalar, SNES_Vector, SNES_Stokes_SaddlePt
from underworld3.cython.generic_solvers import SNES_MultiComponent
from underworld3 import VarType
import underworld3.timing as timing
from underworld3.utilities import memprobe
from underworld3.utilities._api_tools import (
    uw_object,
    SymbolicProperty,
    Parameter,
    Template,
    ExpressionProperty,
)

from underworld3.function import expression as public_expression

expression = lambda *x, **X: public_expression(*x, _unique_name_generation=True, **X)


def _apply_unit_aware_scaling(dt_nondimensional, field, mesh):
    """
    Helper function to apply unit-aware scaling to timestep estimates.

    Detects the units of the velocity field and applies appropriate time scaling
    to convert nondimensional timestep to physical time units.

    Parameters
    ----------
    dt_nondimensional : float or np.ndarray
        The nondimensional timestep estimate
    field : MeshVariable or SymPy expression (often a Matrix)
        The velocity field - units are detected from this
    mesh : Mesh
        The mesh (may have reference to model with time scales)

    Returns
    -------
    float or UWQuantity
        Timestep with physical time units if detectable, otherwise nondimensional
    """
    try:
        from ..function.quantities import UWQuantity
        from ..units import get_units
        import sympy

        # Extract a component from field if it's a Matrix (common for velocity)
        field_to_check = field
        if isinstance(field, sympy.MatrixBase):
            # Extract first component: V_fn[0] or V_fn[0,0]
            if field.shape[0] > 0:
                field_to_check = field[0] if len(field.shape) == 1 else field[0, 0]

        # Try to get units from the field expression
        field_units = get_units(field_to_check)

        if field_units is not None:
            # Field has units - verify it has time dimension (as expected for velocity)
            # Get dimensionality: e.g., {'[length]': 1, '[time]': -1} for velocity
            field_dimensionality = field_units.dimensionality

            # Check if this has time dimension (velocity should have time^-1)
            if '[time]' in field_dimensionality:
                # Velocity field has time dimension - use model time scale for result
                # Don't try to match the velocity's specific time units (fragile string parsing)
                # Instead, always return in model's fundamental time scale and let user convert
                model = uw.get_default_model()
                if model and hasattr(model, "fundamental_scales"):
                    model_time_scale = model.fundamental_scales.get("time")
                    if model_time_scale is not None:
                        # Apply scaling: dt_physical = dt_nd * time_scale
                        dt_physical = dt_nondimensional * model_time_scale

                        # Return as UWQuantity
                        if not isinstance(dt_physical, UWQuantity):
                            dt_physical = UWQuantity._from_pint(dt_physical)
                        return dt_physical

        # Fallback: check if model has time scales (old behavior)
        model = uw.get_default_model()
        if model and hasattr(model, "fundamental_scales") and model.fundamental_scales:
            time_scale = model.fundamental_scales.get("time")
            if time_scale is not None:
                dt_physical = dt_nondimensional * time_scale
                if not isinstance(dt_physical, UWQuantity):
                    dt_physical = UWQuantity._from_pint(dt_physical)
                return dt_physical

    except Exception as e:
        # Silently fall back to nondimensional
        pass

    return dt_nondimensional


from .ddt import SemiLagrangian as SemiLagrangian_DDt
from .ddt import Lagrangian as Lagrangian_DDt
from .ddt import Eulerian as Eulerian_DDt
from .ddt import Symbolic as Symbolic_DDt


[docs] class SNES_Poisson(SNES_Scalar): r"""Poisson equation solver. Provides a discrete representation of the Poisson equation: .. math:: \nabla \cdot \left[ \boldsymbol{\kappa} \nabla u \right] = f where :math:`\mathbf{F} = \boldsymbol{\kappa} \nabla u` relates the flux to gradients in the unknown :math:`u`. Parameters ---------- mesh : Mesh The computational mesh. u_Field : MeshVariable, optional Pre-existing mesh variable for the solution. If None, one is created. verbose : bool, optional Enable verbose output during solve. degree : int, optional Polynomial degree for the solution field (default: 2). DuDt : SemiLagrangian_DDt or Lagrangian_DDt, optional Time derivative operator for time-dependent problems. DFDt : SemiLagrangian_DDt or Lagrangian_DDt, optional Time derivative operator for the flux. """
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, u_Field: uw.discretisation.MeshVariable = None, verbose=False, degree=2, DuDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None, DFDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None, ): ## Keep track ## Parent class will set up default values etc super().__init__( mesh, u_Field, degree, verbose, DuDt=DuDt, DFDt=DFDt, ) # default values for properties self.f = sympy.Matrix.zeros(1, 1) self._constitutive_model = None
# ========================================================================= # PETSc Residual Templates # For the Poisson equation: -∇·(k∇u) = f # ========================================================================= F0 = Template( r"f_0 \left( \mathbf{u} \right)", lambda self: -self.f, r"""Source term for the Poisson equation (pointwise). The $f_0$ term represents the source/sink in the diffusion equation. Set via the ``f`` property (e.g., ``poisson.f = heat_source``). """, ) F1 = Template( r"\mathbf{F}_1\left( \mathbf{u} \right)", lambda self: self.constitutive_model.flux.T, r"""Diffusive flux term for the Poisson equation (pointwise). The $\mathbf{F}_1$ vector represents the flux $k \nabla u$ from the constitutive model, where $k$ is the diffusivity. """, )
[docs] @timing.routine_timer_decorator def poisson_problem_description(self): """Build residual terms for Poisson FEM assembly.""" # f1 residual term (weighted integration) - scalar function self._f0 = self.F0.sym # f1 residual term (integration by parts / gradients) # isotropic self._f1 = self.F1.sym return
@property def f(self): r"""Source term for the Poisson equation. The source term :math:`f` appears on the right-hand side: .. math:: \nabla \cdot (\kappa \nabla u) = f Returns ------- sympy.Matrix Source term expression (scalar, shape ``(1, 1)``). See Also -------- constitutive_model : Provides the diffusivity :math:`\kappa`. """ return self._f @f.setter def f(self, value): """Set the source term (handles units and scaling).""" self._needs_function_rewire = True # Handle UWQuantity with units - enforce "units everywhere" principle if hasattr(value, "value") and hasattr(value, "units"): # Extract the plain value plain_value = float(value.value) # If ND scaling is active, scale the constant import underworld3 as uw if uw.is_nondimensional_scaling_active(): # The source term should have same dimensionality as the unknown field # Access via self.Unknowns.u (Poisson) or self.Unknowns.DuDt.u (Stokes) u = None if hasattr(self, "Unknowns") and hasattr(self.Unknowns, "u"): u = self.Unknowns.u if u is not None and hasattr(u, "scaling_coefficient"): scale = u.scaling_coefficient if scale != 1.0 and scale != 0.0: plain_value = plain_value / scale self._f = sympy.Matrix((plain_value,)) # Also accept plain SymPy expressions (for variables, etc.) elif hasattr(value, "atoms"): # It's a SymPy expression self._f = sympy.Matrix((value,)) # Handle plain numbers - apply "units everywhere or nowhere" principle # Exception: allow zero always (dimensionless/no source) elif isinstance(value, (int, float)): if value == 0.0 or value == 0: # Zero is allowed - no source term self._f = sympy.Matrix((0.0,)) else: # Check if model has reference quantities defined # If yes: enforce units everywhere # If no: allow plain numbers (user is responsible for consistency) import underworld3 as uw model = uw.get_default_model() if model.has_units(): # Reference quantities defined - enforce units everywhere raise ValueError( f"Units requirement enforced: Model has reference quantities defined.\n" f"Source term 'f' requires units. Use:\n" f" solver.f = uw.quantity({value}, 'appropriate_units')\n" f"Example: poisson.f = uw.quantity({value}, 'kelvin')" ) else: # No reference quantities - allow plain numbers # User is responsible for dimensional consistency self._f = sympy.Matrix((float(value),)) else: # Accept other types (might be symbolic) self._f = sympy.Matrix((value,)) @property def CM_is_setup(self): """Whether the constitutive model is configured for this solver.""" return self._constitutive_model._solver_is_setup
[docs] class SNES_Darcy(SNES_Scalar): r""" Darcy flow equation solver for groundwater problems. Provides a discrete representation of the groundwater flow equations: .. math:: \underbrace{S_s \frac{\partial h}{\partial t}}_{\dot{u}} - \nabla \cdot \underbrace{\left[ \boldsymbol{\kappa} \nabla h - \boldsymbol{s} \right]}_{\mathbf{F}} = \underbrace{W}_{h} The flux term :math:`\mathbf{F}` relates the effective velocity to pressure gradients: .. math:: \boldsymbol{v} = \boldsymbol{\kappa} \nabla h - \boldsymbol{s} Parameters ---------- mesh : Mesh The computational mesh. h_Field : MeshVariable, optional Mesh variable for hydraulic head. Created automatically if not provided. v_Field : MeshVariable, optional Mesh variable for Darcy velocity. Created automatically if not provided. degree : int, default=2 Polynomial degree for the finite element discretization. verbose : bool, default=False Enable verbose output. DuDt : optional Time derivative operator for the unknown. DFDt : optional Time derivative operator for the flux. Notes ----- - The unknown is :math:`h`, the hydraulic head - The permeability tensor :math:`\kappa` is set via the ``constitutive_model`` property using one of the ``uw.constitutive_models`` classes - :math:`W` is a pressure source term - :math:`S_s` is the specific storage coefficient - The time-dependent term :math:`\dot{f}` is not implemented in this version - The solver returns both the primary field and the Darcy flux (mean-flow velocity) See Also -------- SNES_Poisson : Related diffusion-only solver. uw.constitutive_models.DarcyFlowModel : Constitutive model for Darcy flow. """
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, h_Field: Optional[uw.discretisation.MeshVariable] = None, v_Field: Optional[uw.discretisation.MeshVariable] = None, degree: int = 2, verbose=False, DuDt=None, DFDt=None, ): ## Parent class will set up default values etc super().__init__( mesh, h_Field, degree, verbose, DuDt=DuDt, DFDt=DFDt, ) # default values for properties self._f = sympy.Matrix([0]) self._k = 1 self._constitutive_model = None ## Set up the projection operator that ## solves the flow rate self._v_projector = uw.systems.solvers.SNES_Vector_Projection( self.mesh, v_Field, degree=self.Unknowns.u.degree - 1, ) self._v = self._v_projector.Unknowns.u # If we add smoothing, it should be small self._v_projector.smoothing = 1.0e-6
# ========================================================================= # PETSc Residual Templates # For Darcy flow: -∇·(K∇p) = f, with velocity v = -K∇p # ========================================================================= F0 = Template( r"f_0 \left( \mathbf{u} \right)", lambda self: -self.f, r"""Source term for the Darcy pressure equation (pointwise). The $f_0$ term represents fluid sources/sinks. Set via the ``f`` property. """, ) F1 = Template( r"\mathbf{F}_1\left( \mathbf{u} \right)", lambda self: self.darcy_flux, r"""Darcy flux term (pointwise). The $\mathbf{F}_1$ vector is the Darcy flux $K \nabla p$, where $K$ is the permeability. """, )
[docs] @timing.routine_timer_decorator def darcy_problem_description(self): """Build residual terms for Darcy flow FEM assembly.""" # f1 residual term (weighted integration) self._f0 = self.F0.sym # f1 residual term (integration by parts / gradients) self._f1 = self.F1.sym # Flow calculation self._v_projector.uw_function = -self.F1.sym return
@property def f(self): """Source term for the Darcy equation.""" return self._f @f.setter def f(self, value): """Set the source term.""" self._needs_function_rewire = True self._f = sympy.Matrix((value,)) @property def darcy_flux(self): """Darcy flux velocity computed from pressure gradient.""" flux = self.constitutive_model.flux.T return flux @property def v(self): """Projected Darcy velocity field.""" return self._v @v.setter def v(self, value): """Set the velocity projection target.""" self._v_projector.is_setup = False self._v = sympify(value)
[docs] @timing.routine_timer_decorator def solve( self, zero_init_guess: bool = True, timestep: float = None, verbose: bool = False, _force_setup: bool = False, divergence_retries: int = 0, ): r"""Solve the Darcy flow system. Computes the pressure field and Darcy flux velocity. Parameters ---------- zero_init_guess : bool, optional If True (default), start from zero initial guess. If False, use current field values as initial guess. timestep : float, optional Timestep value for inertial terms (if applicable). verbose : bool, optional If True, print solver progress information. _force_setup : bool, optional Force re-setup of solver even if already configured. divergence_retries : int, optional If SNES reports DIVERGED, retry with warm start up to this many times. 0 preserves legacy behaviour. Notes ----- After solving, the pressure field ``self.u`` and velocity field ``self.v`` contain the solution. """ if (not self.is_setup) or _force_setup: self._setup_pointwise_functions(verbose) self._setup_discretisation(verbose) self._setup_solver(verbose) # Solve pressure super().solve(zero_init_guess, _force_setup, divergence_retries=divergence_retries) # Now solve flow field: v = -flux = -K(grad(h) - s) # self._v_projector.petsc_options["snes_rtol"] = 1.0e-6 # self._v_projector.petsc_options.delValue("ksp_monitor") self._v_projector.uw_function = -self.darcy_flux self._v_projector.solve(zero_init_guess) return
# @timing.routine_timer_decorator # def _setup_terms(self): # self._v_projector.uw_function = self.darcy_flux # self._v_projector._setup_terms() # _setup_pointwise_functions() # # super()._setup_terms() class SNES_TransientDarcy(SNES_Darcy): r""" Transient Darcy flow solver for time-dependent groundwater problems. Solves the transient groundwater flow equation: .. math:: S_s \frac{\partial h}{\partial t} - \nabla \cdot \left[ K (\nabla h - \mathbf{s}) \right] = f where :math:`S_s` is the specific storage (constant), :math:`K` is the hydraulic conductivity, :math:`\mathbf{s}` is the body force (gravity), and :math:`f` is a source/sink term. Inherits velocity projection from :class:`SNES_Darcy`. Parameters ---------- mesh : Mesh The computational mesh. h_Field : MeshVariable, optional Mesh variable for hydraulic head. v_Field : MeshVariable, optional Mesh variable for Darcy velocity. order : int, default=1 Time integration order (BDF/Adams-Moulton history depth). theta : float, default=0.5 Implicit time weighting (0=explicit, 0.5=Crank-Nicolson, 1=implicit). degree : int, default=2 Polynomial degree for finite element discretization. verbose : bool, default=False Enable verbose output. DuDt : optional Time derivative operator for the unknown. DFDt : optional Time derivative operator for the flux. Attributes ---------- storage : sympy.Expr Specific storage :math:`S_s` (default 1). delta_t : UWexpression Current timestep. See Also -------- SNES_Darcy : Steady-state parent solver. SNES_Richards : Nonlinear extension for unsaturated flow. """ @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, h_Field: Optional[uw.discretisation.MeshVariable] = None, v_Field: Optional[uw.discretisation.MeshVariable] = None, order: int = 1, theta: float = 0.5, degree: int = 2, verbose=False, DuDt=None, DFDt=None, ): super().__init__(mesh, h_Field, v_Field, degree, verbose, DuDt=DuDt, DFDt=DFDt) self.theta = theta self._delta_t = expression(R"\Delta t", 0, "Physically motivated timestep") self._storage = sympy.sympify(1) self._needs_function_rewire = True if DuDt is None: self.Unknowns.DuDt = Eulerian_DDt( self.mesh, h_Field, vtype=uw.VarType.SCALAR, degree=h_Field.degree, continuous=h_Field.continuous, varsymbol=h_Field.symbol, verbose=verbose, order=order, smoothing=0.0, ) else: if order is not None and DuDt.order < order: raise RuntimeError( f"DuDt supplied is order {DuDt.order} but order requested is {order}" ) self.Unknowns.DuDt = DuDt if DFDt is None: self.Unknowns.DFDt = Symbolic_DDt( sympy.Matrix([[0] * self.mesh.dim]), varsymbol=rf"{{F[ {self.u.symbol} ] }}", theta=theta, order=order, ) else: self.Unknowns.DFDt = DFDt # --- Properties --- @property def storage(self): """Specific storage coefficient :math:`S_s`.""" return self._storage @storage.setter def storage(self, value): self._needs_function_rewire = True self._storage = sympy.sympify(value) @property def delta_t(self): """Current timestep.""" return self._delta_t @delta_t.setter def delta_t(self, value): self._needs_function_rewire = True if hasattr(value, "value"): self._delta_t.sym = value.value elif hasattr(value, "magnitude"): self._delta_t.sym = value.magnitude else: self._delta_t.sym = value @property def DuDt(self): return self.Unknowns.DuDt @property def DFDt(self): return self.Unknowns.DFDt # --- Template expressions (override Darcy's steady-state Templates) --- @property def F0(self): """Pointwise storage + source: :math:`S_s \\dot{h} / \\Delta t - f`.""" f0 = expression( r"f_0(h)", -self.f + self.storage * self.DuDt.bdf() / self.delta_t, "Transient Darcy storage + source term", ) self._f0 = f0 return f0 @property def F1(self): """Pointwise flux: Adams-Moulton time-weighted Darcy flux.""" F1_val = expression( r"\mathbf{F}_1(h)", self.DFDt.adams_moulton_flux(), "Transient Darcy flux (time-weighted)", ) self._f1 = F1_val return F1_val # --- Timestep estimation --- @timing.routine_timer_decorator def estimate_dt(self): r""" Estimate a stable timestep based on diffusive CFL. Returns ------- float or pint.Quantity Diffusive timestep :math:`\delta t = (\Delta x)^2 / K_{\max}`. """ from mpi4py import MPI K = self.constitutive_model.K if isinstance(K, sympy.Expr) or hasattr(K, "sym"): K_sym = K.sym if hasattr(K, "sym") else K if uw.function.fn_is_constant_expr(K_sym): diffusivity = uw.function.evaluate( K_sym, np.zeros((1, self.mesh.dim)) ) else: diffusivity = uw.function.evaluate( sympy.sympify(K_sym), self.mesh._centroids, self.mesh.N ) diffusivity = diffusivity.max() else: diffusivity = K if hasattr(diffusivity, "units") and diffusivity.units is not None: diffusivity = uw.non_dimensionalise(diffusivity) elif hasattr(diffusivity, "magnitude"): diffusivity = diffusivity.magnitude diffusivity = float(np.asarray(diffusivity).max()) comm = uw.mpi.comm diffusivity_glob = comm.allreduce(diffusivity, op=MPI.MAX) min_dx = self.mesh.get_min_radius() if diffusivity_glob != 0.0: dt_diff = (min_dx**2) / diffusivity_glob else: dt_diff = np.inf return _apply_unit_aware_scaling(np.squeeze(dt_diff), self.u, self.mesh) # --- Solve --- @timing.routine_timer_decorator def solve( self, zero_init_guess: bool = True, timestep=None, _force_setup: bool = False, verbose=False, divergence_retries: int = 0, ): r""" Solve the transient Darcy system for one timestep. Parameters ---------- zero_init_guess : bool, optional Start from zero initial guess (default True). timestep : float, optional Timestep size. Updates ``self.delta_t`` if provided. _force_setup : bool, optional Force re-setup of solver. verbose : bool, optional Print solver progress. divergence_retries : int, optional If SNES reports DIVERGED, retry with warm start up to this many times. 0 preserves legacy behaviour. """ if timestep is not None and timestep != self.delta_t: self.delta_t = timestep if not self.constitutive_model._solver_is_setup: self._needs_function_rewire = True self.DFDt.psi_fn = self.constitutive_model.flux.T if not self.is_setup: self._setup_pointwise_functions(verbose) self._setup_discretisation(verbose) self._setup_solver(verbose) # Pre-solve: update history terms self.DuDt.update_pre_solve(timestep, verbose=verbose) self.DFDt.update_pre_solve(timestep, verbose=verbose) # Solve PDE (bypass SNES_Darcy.solve to avoid double setup/projection) SNES_Scalar.solve(self, zero_init_guess, _force_setup, divergence_retries=divergence_retries) # Invalidate cached data views target_var = getattr(self.u, "_base_var", self.u) if hasattr(target_var, "_canonical_data"): target_var._canonical_data = None # Post-solve: shift history self.DuDt.update_post_solve(timestep, verbose=verbose) self.DFDt.update_post_solve(timestep, verbose=verbose) # Velocity projection (inherited from Darcy) self._v_projector.uw_function = self.darcy_flux self._v_projector.solve(zero_init_guess) self.is_setup = True self.constitutive_model._solver_is_setup = True return class SNES_Richards(SNES_TransientDarcy): r""" Richards equation solver for variably-saturated porous media flow. Two formulations are supported: **Mixed form** (mass-conservative, preferred) — set ``water_content``: .. math:: \frac{\partial \theta}{\partial t} - \nabla \cdot \left[ K(\psi) (\nabla \psi - \mathbf{s}) \right] = f discretised as :math:`(\theta(\psi^{n+1}) - \theta(\psi^n)) / \Delta t`, which is exactly conservative by construction (Celia et al., 1990). **Head-based form** (backward compatible) — set ``capacity``: .. math:: C(\psi) \frac{\partial \psi}{\partial t} - \nabla \cdot \left[ K(\psi) (\nabla \psi - \mathbf{s}) \right] = f where :math:`C(\psi) = d\theta/d\psi` is the specific moisture capacity. Parameters ---------- mesh : Mesh The computational mesh. psi_Field : MeshVariable, optional Mesh variable for pressure head :math:`\psi`. v_Field : MeshVariable, optional Mesh variable for Darcy velocity. order : int, default=1 Time integration order. theta : float, default=0.5 Implicit time weighting. degree : int, default=2 Polynomial degree. verbose : bool, default=False Enable verbose output. DuDt : optional Time derivative operator for the unknown. DFDt : optional Time derivative operator for the flux. Attributes ---------- water_content : sympy.Expr or None Water content function :math:`\theta(\psi)` for the mixed form. When set, the storage term uses :math:`(\theta(\psi^{n+1}) - \theta(\psi^n)) / \Delta t`. Takes precedence over ``capacity`` if both are set. capacity : sympy.Expr Specific moisture capacity :math:`C(\psi)` for the head-based form. Used only when ``water_content`` is None. psi : MeshVariable Alias for ``self.u`` (pressure head). See Also -------- SNES_TransientDarcy : Linear parent solver. underworld3.utilities.retention_curves : Van Genuchten and Gardner functions. Examples -------- Mixed form (preferred): >>> from underworld3.utilities.retention_curves import ( ... van_genuchten_theta, van_genuchten_K, ... ) >>> richards = uw.systems.Richards(mesh, psi_Field=psi, v_Field=v) >>> richards.constitutive_model = uw.constitutive_models.DarcyFlowModel >>> richards.constitutive_model.Parameters.permeability = van_genuchten_K( ... psi.sym[0], Ks=1e-4, alpha=3.35, n=2.0, ... ) >>> richards.water_content = van_genuchten_theta( ... psi.sym[0], theta_r=0.045, theta_s=0.43, alpha=3.35, n=2.0, ... ) Head-based form (backward compatible): >>> from underworld3.utilities.retention_curves import van_genuchten_C >>> richards.capacity = van_genuchten_C( ... psi.sym[0], theta_r=0.045, theta_s=0.43, alpha=3.35, n=2.0, ... ) """ @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, psi_Field: Optional[uw.discretisation.MeshVariable] = None, v_Field: Optional[uw.discretisation.MeshVariable] = None, order: int = 1, theta: float = 0.5, degree: int = 2, verbose=False, DuDt=None, DFDt=None, ): super().__init__( mesh, psi_Field, v_Field, order, theta, degree, verbose, DuDt, DFDt ) self._capacity = sympy.sympify(1) self._water_content = None # None → head-based form; set → mixed form @property def water_content(self): r"""Water content function :math:`\theta(\psi)` for the mixed form. When set, the storage term uses :math:`(\theta(\psi^{n+1}) - \theta(\psi^n)) / \Delta t` instead of :math:`C(\psi) \cdot (\psi^{n+1} - \psi^n) / \Delta t`, giving exact mass conservation (Celia et al., 1990). The expression should be a SymPy function of ``psi.sym[0]``. The Jacobian :math:`\partial\theta/\partial\psi = C(\psi)` is computed automatically by PETSc (finite-difference colouring), so there is no need to provide :math:`C(\psi)` separately. """ return self._water_content @water_content.setter def water_content(self, value): self._needs_function_rewire = True self._water_content = sympy.sympify(value) if value is not None else None @property def capacity(self): r"""Specific moisture capacity :math:`C(\psi) = d\theta/d\psi`. Used only when ``water_content`` is None (head-based form). Typically a nonlinear SymPy expression depending on ``psi.sym[0]``. """ return self._capacity @capacity.setter def capacity(self, value): self._needs_function_rewire = True self._capacity = sympy.sympify(value) @property def psi(self): """Alias for ``self.u`` (pressure head).""" return self.u @property def F0(self): r"""Pointwise storage + source term. Mixed form: :math:`(\theta(\psi^{n+1}) - \theta(\psi^n)) / \Delta t - f` Head-based form: :math:`C(\psi) (\psi^{n+1} - \psi^n) / \Delta t - f` """ if self._water_content is not None: # Mixed (mass-conservative) form: # θ(ψ^{n+1}) is self._water_content (already in terms of psi.sym[0]) # θ(ψ^n) is water_content with psi.sym[0] → psi_star[0].sym[0] psi_sym = self.psi.sym[0] psi_star_sym = self.DuDt.psi_star[0].sym[0] theta_new = self._water_content theta_old = self._water_content.subs(psi_sym, psi_star_sym) storage_term = sympy.Matrix([[theta_new - theta_old]]) / self.delta_t else: # Head-based form (backward compatible) storage_term = self.capacity * self.DuDt.bdf() / self.delta_t f0 = expression( r"f_0(\psi)", -self.f + storage_term, "Richards storage + source term", ) self._f0 = f0 return f0 ## -------------------------------- ## Stokes saddle point solver plus ## ancilliary functions - note that ## we need to update the description ## of the generic saddle pt solver ## to remove the Stokes-specific stuff ## --------------------------------
[docs] class SNES_Stokes(SNES_Stokes_SaddlePt): r""" Stokes equation solver for incompressible viscous flow. This class provides functionality for a discrete representation of the Stokes flow equations assuming an incompressibility (or near-incompressibility) constraint. The momentum equation is: .. math:: -\nabla \cdot \left[ \boldsymbol{\tau} - p \mathbf{I} \right] = \mathbf{f} with the incompressibility constraint: .. math:: \nabla \cdot \mathbf{u} = 0 The flux term is a deviatoric stress (:math:`\boldsymbol{\tau}`) related to velocity gradients (:math:`\nabla \mathbf{u}`) through a viscosity tensor :math:`\eta`, and a volumetric (pressure) part :math:`p`: .. math:: \boldsymbol{\tau} = \frac{\eta}{2}\left( \nabla \mathbf{u} + \nabla \mathbf{u}^T \right) The constraint equation gives incompressible flow by default but can be set to any function of the unknown :math:`\mathbf{u}` and :math:`\nabla \cdot \mathbf{u}`. Parameters ---------- mesh : uw.discretisation.Mesh The computational mesh. velocityField : uw.discretisation.MeshVariable, optional Pre-existing velocity field. If None, one is created automatically. pressureField : uw.discretisation.MeshVariable, optional Pre-existing pressure field. If None, one is created automatically. degree : int, optional Polynomial degree for velocity interpolation. Default is 2. p_continuous : bool, optional If True (default), pressure is continuous. Set False for discontinuous pressure. verbose : bool, optional Enable verbose output during solving. Default is False. DuDt : SemiLagrangian_DDt or Lagrangian_DDt, optional Material derivative operator for velocity (used in derived classes). DFDt : SemiLagrangian_DDt or Lagrangian_DDt, optional Material derivative operator for flux (used in viscoelastic models). Notes ----- **Viscosity model**: The viscosity tensor :math:`\boldsymbol{\eta}` is provided by setting the ``constitutive_model`` property to one of the ``uw.constitutive_models`` classes. It may be constant, spatially varying, non-linear, or anisotropic. **Augmented Lagrangian**: Setting ``penalty`` to a non-zero value adds :math:`\lambda \nabla \cdot \mathbf{u}` to the weak form, improving convergence for incompressible flow (in addition to the constraint equation). **Mixed finite elements**: The pressure field interpolation order determines the integration order of the mixed method and is typically lower than the velocity field order. **Viscoelastic models**: For viscoelastic behaviour, the flux term contains stress history tracked on a particle swarm. See :class:`SNES_VE_Stokes`. See Also -------- SNES_VE_Stokes : Viscoelastic Stokes solver with flux history. SNES_NavierStokes : Navier-Stokes solver with inertial terms. uw.constitutive_models : Available viscosity models. Examples -------- >>> import underworld3 as uw >>> mesh = uw.meshing.UnstructuredSimplexBox(minCoords=(0,0), maxCoords=(1,1), cellSize=0.1) >>> stokes = uw.systems.Stokes(mesh, degree=2) >>> stokes.constitutive_model = uw.constitutive_models.ViscousFlowModel() >>> stokes.constitutive_model.Parameters.viscosity = 1.0 >>> stokes.bodyforce = [0, -1] # gravity >>> stokes.solve() """ instances = 0
[docs] def __init__( self, mesh: uw.discretisation.Mesh, velocityField: Optional[uw.discretisation.MeshVariable] = None, pressureField: Optional[uw.discretisation.MeshVariable] = None, degree: Optional[int] = 2, p_continuous: Optional[bool] = True, verbose: Optional[bool] = False, # Not used in Stokes, but may be used in NS, VE etc DuDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None, DFDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None, ): super().__init__( mesh, velocityField, pressureField, degree, p_continuous, verbose, DuDt=DuDt, DFDt=DFDt, ) self._degree = degree # User-facing operations are matrices / vectors by preference self._Estar = None self._penalty = expression(R"\uplambda", 0, "Numerical Penalty") self._constraints = sympy.Matrix((self.div_u,)) # by default, incompressibility constraint self._bodyforce = expression( Rf"\mathbf{{f}}_0\left( {self.Unknowns.u.symbol} \right)", sympy.Matrix([[0] * self.mesh.dim]), "Stokes pointwise force term: f_0(u)", ) # this attrib records if we need to setup the problem (again) self._needs_function_rewire = True self._constitutive_model = None # Optional: alternative F1 expression to autodiff for the # Jacobian. When None, autodiff F1 itself (default). Used for # inexact-Newton tricks like a smooth-Jacobian / sharp-residual # split at a VEP yield kink. See ``set_jacobian_F1_source``. self._F1_jacobian_source = None return
[docs] def set_jacobian_F1_source(self, F1_source, linesearch="cp"): r"""Override the F1 expression used to build the Jacobian blocks. By default, the Stokes Jacobian's uu / up G2, G3 blocks are autodiff'd from the residual F1. Some problems benefit from differentiating a *different* but related expression — e.g. a smooth (softmin) viscosity formula for the Jacobian while the residual F1 keeps a sharp Min, so Newton sees a continuous derivative even when the iterate sits exactly on the yield kink. Setting ``F1_source`` triggers a JIT recompile (the Jacobian symbols change). Pass ``None`` to revert to autodiff of F1. Parameters ---------- F1_source : sympy.Matrix or None Alternative expression of the same shape as ``F1.sym``. linesearch : str or None, default ``"cp"`` SNES linesearch type to install when ``F1_source`` is set. Defaults to ``"cp"`` (critical-point) because inexact-Newton steps don't reliably reduce the residual norm and PETSc's default ``bt`` (backtracking) consequently rejects useful steps with ``DIVERGED_LINE_SEARCH``. ``cp`` accepts the predicted step at the optimum of the local linearisation and converges cleanly on the same problems where ``bt`` flails. Set to ``None`` to leave the linesearch type untouched (e.g. if you've already configured one via ``petsc_options``). Has no effect when ``F1_source is None``. """ self._F1_jacobian_source = F1_source self._needs_function_rewire = True if F1_source is not None and linesearch is not None: self.petsc_options["snes_linesearch_type"] = linesearch
def _create_stress_history_ddt(self, order=2): """Create DFDt for stress history tracking (VE/VEP models). Called automatically when a constitutive model with ``requires_stress_history = True`` is assigned. Can also be called explicitly to pre-create the DFDt with a specific order. Constitutive models can inject extra SemiLagrangian kwargs via the ``stress_history_ddt_kwargs`` property — used e.g. by ``MaxwellExponentialFlowModel`` to set ``with_forcing_history=True``. """ if self.Unknowns.DFDt is not None: return # already created self._order = order # Constitutive model may request extra SemiLagrangian kwargs (e.g. # with_forcing_history for ETD-2 integration). cm = getattr(self, "constitutive_model", None) ddt_kwargs = {} if cm is not None: ddt_kwargs = dict(getattr(cm, "stress_history_ddt_kwargs", {})) self.Unknowns.DFDt = uw.systems.ddt.SemiLagrangian( self.mesh, sympy.Matrix.zeros(self.mesh.dim, self.mesh.dim), self.u.sym, vtype=uw.VarType.SYM_TENSOR, degree=self.u.degree - 1, continuous=True, varsymbol=rf"{{F[ {self.u.symbol} ] }}", verbose=self.verbose, bcs=None, order=order, smoothing=0.0001, **ddt_kwargs, ) # Stress flux = 2·viscosity·E_eff references psi_star[0] in E_eff's # history term — without snapshot substitution the projection of # flux→psi_star[0] becomes implicit in psi_star[0] and Min-mode at # yield admits the wrong fixed point under timestep change. self.Unknowns.DFDt.enable_source_snapshot()
[docs] @timing.routine_timer_decorator @memprobe.instrument("Stokes.solve") def solve( self, zero_init_guess: bool = True, timestep: float = None, _force_setup: bool = False, verbose: bool = False, debug: bool = False, debug_name: str = None, evalf=False, order=None, picard: int = 0, divergence_retries: int = 0, ): """Solve the Stokes system, with optional viscoelastic stress history. When a constitutive model with stress history is active (DFDt is not None), the solve includes pre/post hooks for advecting stress history, updating BDF coefficients, and projecting the actual stress after the solve. Parameters ---------- zero_init_guess : bool If True, use zero initial guess. Otherwise use current field values. timestep : float, optional Advection timestep. Required when stress history is active. _force_setup : bool Force rebuild of pointwise functions. verbose : bool Enable verbose output. evalf : bool Force numerical evaluation during history updates. order : int, optional Override the VE time integration order. picard : int, default=0 Number of Picard iterations before switching to Newton. Picard uses a simplified Jacobian and can help convergence for strongly nonlinear problems like VEP at yield onset. divergence_retries : int, default=0 If SNES returns a DIVERGED reason after the main solve, re-call the underlying Newton up to this many times with a warm start (``zero_init_guess=False``) to try to rescue. Each retry uses the just-computed iterate plus the freshly-advected stress history, which is often enough for VEP at yield onset (Min/softmin kinks) to step off a bad Newton iterate. ``0`` preserves legacy behaviour (divergence is terminal). Typical useful value is 1. Only applies in the VE/VEP branch (``DFDt is not None``). """ has_stress_history = self.Unknowns.DFDt is not None if has_stress_history: if timestep is None: raise ValueError( "timestep is required for viscoelastic solve. " "Call stokes.solve(timestep=dt)" ) # dt_elastic must always equal the solve timestep. The constitutive # model's VE formulas (eta_eff, stress history terms) all reference # Parameters.dt_elastic. If it differs from the actual timestep, # the stress computation is inconsistent with the time integration. self.constitutive_model.Parameters.dt_elastic = timestep if order is None or order > self._order: order = self._order if _force_setup: self._needs_function_rewire = True # Re-setup when effective_order changes (DDt history ramp-up) _current_eff_order = self.constitutive_model.effective_order if not hasattr(self, '_prev_effective_order'): self._prev_effective_order = None if _current_eff_order != self._prev_effective_order: self._needs_function_rewire = True self.constitutive_model._solver_is_setup = False self._prev_effective_order = _current_eff_order if not self.constitutive_model._solver_is_setup: self._needs_function_rewire = True self.DFDt.psi_fn = self.constitutive_model.flux.T if not self.is_setup: self._setup_pointwise_functions(verbose) self._setup_discretisation(verbose) self._setup_solver(verbose) # 1. ADVECT stress history along characteristics if uw.mpi.rank == 0 and verbose: print(f"Stokes solver - advect stress history", flush=True) self.DFDt.update_pre_solve(timestep, verbose=verbose, evalf=evalf, store_result=False) # Uniform pre-solve coefficient hook: VEP delegates to # _update_bdf_coefficients(); MaxwellExponentialFlowModel updates # α, φ on the DDt via _update_exp_coefficients(). No isinstance # checks at the solver layer. self.constitutive_model._update_history_coefficients() # 2. SOLVE if uw.mpi.rank == 0 and verbose: print(f"Stokes solver - solve", flush=True) super().solve( zero_init_guess, _force_setup=_force_setup, verbose=verbose, picard=picard, divergence_retries=divergence_retries, ) # 3. PROJECT actual stress and SHIFT history if uw.mpi.rank == 0 and verbose: print(f"Stokes solver - store stress and shift history", flush=True) import numpy as np _advected_sigma_star = np.copy(self.DFDt.psi_star[0].array[...]) if getattr(self.DFDt, '_psi_star_use_multicomponent', False): # Multi-component projection of flux → psi_star[0]. # # The DFDt's source-snapshot machinery (enabled once in # _create_stress_history_ddt) intercepts psi_fn assignment # to substitute psi_star[0] symbols with a frozen # psi_snapshot variable, refreshed each step in # update_pre_solve. So the projection's compiled source # reads from psi_snapshot (not psi_star[0] itself) and is a # true one-shot Galerkin projection — no implicit # fixed-point iteration. self.DFDt._psi_star_projection_solver.smoothing = 0.0 self.DFDt._psi_star_projection_solver.solve(verbose=verbose) # Fan flat result back to psi_star[0] tensor variable for k, (i, j) in enumerate(self.DFDt._psi_star_indep_indices): vals = self.DFDt._psi_star_flat_var.array[:, 0, k] self.DFDt.psi_star[0].array[:, i, j] = vals if i != j: self.DFDt.psi_star[0].array[:, j, i] = vals else: self.DFDt._psi_star_projection_solver.uw_function = self.constitutive_model.flux self.DFDt._psi_star_projection_solver.smoothing = 0.0 self.DFDt._psi_star_projection_solver.solve(verbose=verbose) for i in range(self.DFDt.order - 1, 0, -1): if i == 1: self.DFDt.psi_star[i].array[...] = _advected_sigma_star else: self.DFDt.psi_star[i].array[...] = self.DFDt.psi_star[i - 1].array[...] self.DFDt.update_post_solve(timestep, verbose=verbose, evalf=evalf) # Uniform post-solve hook for any extra integrator-state storage. # VEP: no-op. ETD-2 / MaxwellExponentialFlowModel: refresh # forcing_star with current ε̇^{n+1} so the next step's history # term has access to ε̇ⁿ. self.constitutive_model._update_history_post_solve() self.is_setup = True self.constitutive_model._solver_is_setup = True else: # Plain Stokes — no stress history super().solve( zero_init_guess, _force_setup=_force_setup, verbose=verbose, divergence_retries=divergence_retries, )
@property def tau(self): r"""Deviatoric stress from the most recent solve. When stress history is active (VEP), returns ``psi_star[0]`` which contains the actual projected stress. Otherwise falls through to the base class lazy projection. """ if self.Unknowns.DFDt is not None: return self.DFDt.psi_star[0] return super().tau # ========================================================================= # PETSc Residual Templates # These define the weak form terms assembled by PETSc's finite element system. # F0/F1 are velocity equation terms, PF0 is the pressure/continuity equation. # ========================================================================= F0 = Template( r"\mathbf{f}_0\left( \mathbf{u} \right)", lambda self: -self.bodyforce.sym, r"""Velocity equation body force term (pointwise). The $\mathbf{f}_0$ term represents external body forces acting on the fluid, typically gravity. This is the negative of the ``bodyforce`` parameter. """, ) F1 = Template( r"\mathbf{F}_1\left( \mathbf{u} \right)", lambda self: ( self.stress + self.penalty * self.div_u * sympy.eye(self.mesh.dim) ), r"""Velocity equation flux/stress term (pointwise). The $\mathbf{F}_1$ tensor represents the stress response of the fluid, combining deviatoric stress $\boldsymbol{\tau}$, pressure $p$, and penalty term for weak incompressibility. """, ) PF0 = Template( r"\mathbf{h}_0\left( \mathbf{p} \right)", lambda self: sympy.Matrix((self.constraints)), r"""Pressure equation constraint term (continuity). The $h_0$ term enforces the incompressibility constraint $\nabla \cdot \mathbf{u} = 0$. Additional constraints can be added via ``add_condition()``. """, ) # deprecated
[docs] @timing.routine_timer_decorator def stokes_problem_description(self): """Build residual terms for Stokes FEM assembly (deprecated).""" # f0 residual term self._u_f0 = self.F0.sym # f1 residual term self._u_f1 = self.F1.sym # p0 residual term self._p_f0 = self.PF0.sym return
@property def CM_is_setup(self): """Whether the constitutive model is configured for this solver.""" return self._constitutive_model._solver_is_setup @property def strainrate(self): r"""Symmetric strain rate tensor from velocity gradients. The strain rate tensor :math:`\dot{\varepsilon}` is computed as: .. math:: \dot{\varepsilon}_{ij} = \frac{1}{2}\left(\frac{\partial u_i}{\partial x_j} + \frac{\partial u_j}{\partial x_i}\right) Returns ------- sympy.Matrix Symmetric tensor of shape ``(dim, dim)`` where ``dim`` is the mesh dimensionality. See Also -------- strainrate_1d : Voigt notation (vector) form. stress_deviator : Deviatoric stress computed from strain rate. """ return sympy.Matrix(self.mesh.vector.strain_tensor(self.Unknowns.u.sym)) @property def strainrate_1d(self): r"""Strain rate in Voigt notation (vector form). Converts the symmetric strain rate tensor to Voigt notation for use in constitutive model calculations. In 2D, returns a 3-component vector; in 3D, a 6-component vector. Returns ------- sympy.Matrix Strain rate in Voigt notation. See Also -------- strainrate : Full tensor form. """ return uw.maths.tensor.rank2_to_voigt(self.strainrate, self.mesh.dim) @property def strainrate_star_1d(self): r"""Historical strain rate in Voigt notation (for viscoelastic models). Used in viscoelastic formulations where the stress depends on both current and historical strain rates. Returns ------- sympy.Matrix Historical strain rate in Voigt notation. See Also -------- strainrate_1d : Current strain rate. """ return uw.maths.tensor.rank2_to_voigt(self.strainrate_star, self.mesh.dim) @property def stress_deviator(self): r"""Deviatoric stress tensor from the constitutive model. The deviatoric stress :math:`\boldsymbol{\tau}` is the traceless part of the stress tensor, computed by the constitutive model from the strain rate: .. math:: \boldsymbol{\tau} = \boldsymbol{\eta} : \dot{\boldsymbol{\varepsilon}} For a Newtonian fluid: :math:`\boldsymbol{\tau} = 2\eta\dot{\boldsymbol{\varepsilon}}` Returns ------- sympy.Matrix Deviatoric stress tensor of shape ``(dim, dim)``. See Also -------- stress : Total stress (deviatoric + pressure). constitutive_model : Provides the viscosity relationship. """ return self.constitutive_model.flux @property def stress_deviator_1d(self): r"""Deviatoric stress in Voigt notation. Returns ------- sympy.Matrix Deviatoric stress in Voigt notation. See Also -------- stress_deviator : Full tensor form. """ return uw.maths.tensor.rank2_to_voigt(self.stress_deviator, self.mesh.dim) @property def stress(self): r"""Total Cauchy stress tensor. The total stress combines the deviatoric stress and pressure: .. math:: \boldsymbol{\sigma} = \boldsymbol{\tau} - p\mathbf{I} where :math:`\boldsymbol{\tau}` is the deviatoric stress and :math:`p` is the pressure (positive in compression). Returns ------- sympy.Matrix Total stress tensor of shape ``(dim, dim)``. See Also -------- stress_deviator : Deviatoric (traceless) part. """ return self.stress_deviator - sympy.eye(self.mesh.dim) * (self.p.sym[0]) @property def stress_1d(self): r"""Total stress in Voigt notation. Returns ------- sympy.Matrix Total stress in Voigt notation. See Also -------- stress : Full tensor form. """ return uw.maths.tensor.rank2_to_voigt(self.stress, self.mesh.dim) @property def div_u(self): r"""Velocity divergence. For incompressible flow, this should be zero: .. math:: \nabla \cdot \mathbf{u} = 0 Returns ------- sympy.Expr Scalar divergence expression. Notes ----- Non-zero divergence indicates compressibility or mass sources/sinks. """ return self.mesh.vector.divergence(self.Unknowns.u.sym) @property def constraints(self): r"""Constraint equation for the saddle-point system. By default, this is the incompressibility constraint :math:`\nabla \cdot \mathbf{u} = 0`. Can be modified for compressible or other constrained formulations. Returns ------- sympy.Expr Constraint expression. """ return self._constraints @constraints.setter def constraints(self, constraints_matrix): """Set the constraint equation (e.g., incompressibility).""" self._is_setup = False symval = sympify(constraints_matrix) self._constraints = symval @property def bodyforce(self): r"""Body force vector (source term). The volumetric body force :math:`\mathbf{f}` appears on the right-hand side of the momentum equation: .. math:: -\nabla \cdot \boldsymbol{\sigma} = \mathbf{f} Common examples include gravity (:math:`\rho\mathbf{g}`) or buoyancy forces. Returns ------- UWexpression Body force vector expression. """ return self._bodyforce @bodyforce.setter def bodyforce(self, value): """Set the body force vector (e.g., gravity, buoyancy).""" self._needs_function_rewire = True if isinstance(value, uw.function.expressions.UWexpression): self._bodyforce.sym = value.sym else: # Convert UWQuantity objects to SymPy expressions before Matrix creation converted_value = [] for item in value: if isinstance(item, uw.function.quantities.UWQuantity): sympified = item._sympify_() # If UWQuantity contains a Matrix, extract the scalar element if hasattr(sympified, "shape") and sympified.shape == (1, 1): converted_value.append(sympified[0, 0]) else: converted_value.append(sympified) else: converted_value.append(item) self._bodyforce.sym = sympy.Matrix(converted_value) @property def saddle_preconditioner(self): r"""Preconditioner for the Schur complement in the saddle-point system. For the Stokes system, the default preconditioner is :math:`1/\eta` (inverse viscosity), which approximates the Schur complement :math:`\mathbf{S} \approx \mathbf{B}\mathbf{A}^{-1}\mathbf{B}^T`. Returns ------- sympy.Expr Preconditioner expression (typically inverse viscosity). Notes ----- A good preconditioner significantly improves convergence of the iterative solver. For variable viscosity, use the local viscosity. """ return self._saddle_preconditioner @saddle_preconditioner.setter def saddle_preconditioner(self, value): """Set the Schur complement preconditioner.""" self._needs_function_rewire = True symval = sympify(value) self._saddle_preconditioner = symval @property def penalty(self): r"""Augmented Lagrangian penalty parameter. The penalty $\lambda$ adds a term to the weak form that penalizes non-zero divergence: .. math:: \lambda \int (\nabla \cdot \mathbf{u})(\nabla \cdot \mathbf{v}) \, dV This improves convergence for incompressible flow without changing the solution (since $\nabla \cdot \mathbf{u} = 0$ at convergence). Returns ------- UWexpression Augmented Lagrangian penalty parameter (typically $O(1)$). Notes ----- Set to zero for standard Stokes without augmentation. Unlike classical penalty methods that require very large values, the Augmented Lagrangian approach uses modest penalties of $O(1)$ to improve solver convergence. References ---------- Glowinski, R., & Le Tallec, P. (1989). *Augmented Lagrangian and Operator-Splitting Methods in Nonlinear Mechanics*. SIAM. https://doi.org/10.1137/1.9781611970838 """ return self._penalty @penalty.setter def penalty(self, value): """Set the augmented Lagrangian penalty parameter.""" self._needs_function_rewire = True self._penalty.sym = value # @property # def continuity_rhs(self): # return self._continuity_rhs # @continuity_rhs.setter # def continuity_rhs(self, value): # self.is_setup = False # self._continuity_rhs.sym = value
[docs] @timing.routine_timer_decorator def estimate_dt(self): r""" Calculates an appropriate advective timestep for the Stokes solver. The Stokes equations are quasi-static (no time derivative ∂v/∂t), so there is no diffusive CFL constraint. The only relevant timescale is the advective one: how long it takes material to cross an element. This method computes a per-element timestep: .. math:: \delta t_i = h_i / \|\mathbf{v}_i\| where $h_i$ is the element radius and $\mathbf{v}_i$ is the velocity at the element centroid, then returns the global minimum. This is more accurate than using global max velocity with global min element size, especially for non-uniform meshes with spatially varying velocity. Returns: Pint Quantity or float: The advective timestep with physical time units if a model with reference scales is available, otherwise nondimensional. """ from mpi4py import MPI # Evaluate velocity at element centroids (consistent with AdvDiff) vel = uw.function.evaluate(self.u.sym, self.mesh._centroids) # If vel is unit-aware (UnitAwareArray), nondimensionalise it to get # consistent nondimensional values that match mesh._radii # Note: .magnitude returns physical units, which would be wrong here if hasattr(vel, "units") and vel.units is not None: vel = uw.non_dimensionalise(vel) elif hasattr(vel, "magnitude"): # Plain UWQuantity without units context - use magnitude vel = vel.magnitude # Ensure vel is a plain numpy array vel = np.asarray(vel) # Squeeze out any singleton dimensions from evaluate (shape: N,1,dim -> N,dim) vel = np.squeeze(vel) # Handle edge case of single element (ensure 2D) if vel.ndim == 1: vel = vel.reshape(1, -1) # Get per-element velocity magnitudes vel_magnitudes = np.linalg.norm(vel, axis=1) # Get per-element radii (characteristic element size) element_radii = self.mesh._radii # Compute per-element advective timestep: dt_i = h_i / |v_i| # Avoid division by zero for elements with zero velocity with np.errstate(divide='ignore', invalid='ignore'): dt_per_element = np.where( vel_magnitudes > 0, element_radii / vel_magnitudes, np.inf ) # Get local minimum timestep min_dt_local = np.min(dt_per_element) if len(dt_per_element) > 0 else np.inf # Get global minimum timestep (parallel-safe) comm = uw.mpi.comm min_dt_glob = comm.allreduce(min_dt_local, op=MPI.MIN) # If all velocities are zero, return infinity if np.isinf(min_dt_glob): return np.inf # Dimensionalise to physical time try: return uw.dimensionalise(np.squeeze(min_dt_glob), {'[time]': 1}) except Exception: return np.squeeze(min_dt_glob)
[docs] class SNES_VE_Stokes(SNES_Stokes): r"""Viscoelastic Stokes solver (backward-compatibility wrapper). .. deprecated:: Use ``uw.systems.Stokes`` directly with a ``ViscoElasticPlasticFlowModel`` constitutive model. The Stokes solver now creates stress history infrastructure automatically when the constitutive model is assigned (the lazy-creation pathway also reads ``stress_history_ddt_kwargs`` from the model — required for ``integrator='etd'`` to allocate ``forcing_star``). VE_Stokes pre-creates the DDt at solver ``__init__`` time, before the model exists, so it can't see those kwargs and is incompatible with ``integrator='etd'``. Migration: replace:: stokes = uw.systems.VE_Stokes(mesh, velocityField=v, pressureField=p, order=2) stokes.constitutive_model = uw.constitutive_models.ViscoElasticPlasticFlowModel with:: stokes = uw.systems.Stokes(mesh, velocityField=v, pressureField=p) stokes.constitutive_model = uw.constitutive_models.ViscoElasticPlasticFlowModel( stokes.Unknowns, order=2, ) Parameters ---------- mesh : Mesh The computational mesh. order : int, default=2 BDF time integration order for stress history. **kwargs All other arguments are passed to :class:`SNES_Stokes`. """ instances = 0
[docs] def __init__( self, mesh: uw.discretisation.Mesh, velocityField: Optional[uw.discretisation.MeshVariable] = None, pressureField: Optional[uw.discretisation.MeshVariable] = None, degree: Optional[int] = 2, order: Optional[int] = 2, p_continuous: Optional[bool] = True, verbose: Optional[bool] = False, DuDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None, DFDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None, ): import warnings warnings.warn( "VE_Stokes is deprecated. Use uw.systems.Stokes(...) directly and " "assign the constitutive model afterwards — the Stokes solver creates " "DDt infrastructure lazily when the model is assigned, and the lazy " "path correctly forwards stress_history_ddt_kwargs from the model " "(required for integrator='etd' to allocate forcing_star). " "VE_Stokes pre-creates DDt at __init__ time, before the model exists, " "so it cannot use ETD-2. See the class docstring for migration.", DeprecationWarning, stacklevel=2, ) super().__init__( mesh, velocityField, pressureField, degree, p_continuous, verbose, DuDt=DuDt, DFDt=DFDt, ) # Pre-create DFDt so it's available before constitutive model is set self._create_stress_history_ddt(order=order) return
@property def delta_t(self): """Elastic timestep from the constitutive model.""" return self.constitutive_model.Parameters.dt_elastic
[docs] class SNES_Projection(SNES_Scalar): r""" Scalar projection solver for mapping functions to mesh variables. Solves :math:`u = \tilde{f}` where :math:`\tilde{f}` is a function that can be evaluated within an element and :math:`u` is a mesh variable with associated shape functions. Typically used to obtain a continuous representation of a function not well-defined at mesh nodes (e.g., derivatives or flux components). More broadly, it is a projection from one basis to another. Strong form (the screened-Poisson smoother) ------------------------------------------- The projection is implemented by solving .. math:: u - \nabla \cdot \left( \alpha \nabla u \right) = \tilde{f}, or equivalently, in the more familiar Helmholtz form .. math:: u - \alpha \, \nabla^{2} u \;=\; \tilde{f}. With :math:`\alpha = 0` this is a pure pointwise L2 projection of :math:`\tilde f` onto the discrete space of :math:`u`. With :math:`\alpha > 0` it is a *screened-Poisson smoother*: the equation enforces a balance between fidelity to :math:`\tilde f` and curvature of :math:`u`. The natural length scale that emerges from this balance is .. math:: L \;=\; \sqrt{\alpha}, so :math:`\alpha` has dimensions of **length squared**. The free-space Green's function of the operator decays as :math:`\exp(-r/L)` (in 2D it is :math:`G(r) \propto K_{0}(r/L)/L^{2}`), so the solution behaves like a Gaussian-like convolution of :math:`\tilde f` of width :math:`L` — obtained implicitly by one elliptic solve, without ever assembling the kernel. Features in :math:`\tilde f` of scale much smaller than :math:`L` are attenuated; features much larger than :math:`L` pass through essentially unchanged. Weak form --------- Multiplying by a test function :math:`v` and integrating by parts gives the symmetric weak form actually assembled by PETSc: .. math:: \int_\Omega (u - \tilde f)\, v \; + \; \int_\Omega \alpha \, \nabla u \cdot \nabla v \;=\; 0, which is exactly minimising :math:`\tfrac{1}{2}\!\int (u-\tilde f)^2 + \tfrac{\alpha}{2}\!\int |\nabla u|^2` — a Tikhonov-regularised L2 projection. Setting the smoothing length ---------------------------- Two equivalent accessors are provided: * :attr:`smoothing` — set the coefficient :math:`\alpha` directly (units of length²). Historically used with tiny values (e.g. :math:`10^{-6}`) as a *numerical* regulariser, which corresponds to a sub-grid :math:`L` and produces no physical smoothing. * :attr:`smoothing_length` — set :math:`L` directly (length units, unit-aware via :func:`underworld3.non_dimensionalise`). This is the recommended path when you actually want the projection to act as a low-pass filter of a chosen physical scale. See Also -------- SNES_Vector_Projection : Vector field projection. SNES_Tensor_Projection : Tensor field projection. Parameters ---------- mesh : Mesh The computational mesh. u_Field : MeshVariable, optional Target mesh variable for the projection. scalar_Field : MeshVariable, optional Alternative name for the target field. degree : int, default=2 Polynomial degree for the finite element space. solver_name : str, optional Name for the solver instance. verbose : bool, default=False Enable verbose output. """
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, u_Field: uw.discretisation.MeshVariable = None, degree=2, verbose=False, ): super().__init__( mesh, u_Field, degree, verbose, ) self._needs_function_rewire = True self._smoothing = sympy.sympify(0) self._uw_weighting_function = sympy.sympify(1) self._uw_function = sympy.Matrix([0]) # Default: project zero self._constitutive_model = uw.constitutive_models.Constitutive_Model(self.Unknowns) return
# ========================================================================= # PETSc Residual Templates # L2 projection: minimize ||u - f||^2 with optional smoothing # ========================================================================= F0 = Template( r"f_0 \left( \mathbf{u} \right)", lambda self: (self.u.sym - self.uw_function) * self.uw_weighting_function, r"""Projection misfit term (pointwise). The $f_0$ term measures the weighted difference between the mesh variable and the target function for L2 projection. """, ) F1 = Template( r"\mathbf{F}_1\left( \mathbf{u} \right)", lambda self: self.smoothing * self.mesh.vector.gradient(self.u.sym), r"""Projection smoothing term (pointwise). The $\mathbf{F}_1$ term provides optional regularization via $\alpha \nabla u$, where $\alpha$ is the ``smoothing`` parameter. """, ) # Use SymbolicProperty for automatic unwrapping uw_function = SymbolicProperty(matrix_wrap=True, doc="Function to project onto mesh") @property def smoothing(self): r"""Smoothing coefficient :math:`\alpha` of the screened-Poisson form. The projection solves :math:`u - \nabla\!\cdot\!(\alpha\,\nabla u) = \tilde f`, so :math:`\alpha` has dimensions of **length²** and the implied smoothing length is :math:`L = \sqrt{\alpha}`. The free-space Green's function decays as :math:`\exp(-r/L)`, giving the projection the action of a Gaussian-like convolution of width :math:`L` — without ever forming the kernel. See the class docstring for the full derivation. Two usage patterns ~~~~~~~~~~~~~~~~~~ * **Pure L2 projection** — ``smoothing = 0`` (or omit). No regularisation; ``u`` is the best L2 fit to ``ũ``. * **Genuine length-scale smoother** — set :attr:`smoothing_length` to the desired physical length (unit-aware), or equivalently ``smoothing = L**2``. The output is ``ũ`` smoothed at scale ``L``. .. note:: Historical UW3 code occasionally sets ``smoothing`` to a tiny number (e.g. ``1e-6``) as a *numerical* regulariser against rank deficiency. That value corresponds to :math:`L \approx 10^{-3}` in the problem's ND length units — almost always well below the cell size, so it does no useful smoothing. Use a true sub-grid value only if you need that regularisation; for filtering, use :attr:`smoothing_length`. """ return self._smoothing @smoothing.setter def smoothing(self, smoothing_factor): """Set the smoothing regularization parameter.""" self._needs_function_rewire = True self._smoothing = sympify(smoothing_factor) @property def smoothing_length(self): r"""Smoothing length scale :math:`L` (length units, **unit-aware**). L-valued view on :attr:`smoothing` with the convention :math:`L = \sqrt{\alpha}`. Setting ``smoothing_length = L`` is equivalent to setting ``smoothing = L**2``, but is the natural physical knob because :math:`L` is what the smoother actually filters by. Mathematical meaning -------------------- The projection then solves the screened-Poisson equation .. math:: u \;-\; L^{2}\,\nabla^{2} u \;=\; \tilde f, whose Green's function decays as :math:`\exp(-r/L)`. In practice this acts like a Gaussian convolution of width :math:`L`: * features in :math:`\tilde f` with spatial scale :math:`\ll L` are damped (roughly as :math:`1/(1+k^{2}L^{2})` for a wavenumber :math:`k`); * features with scale :math:`\gg L` are preserved; * features at :math:`\sim L` are attenuated by a factor near ``1/2``. Choosing :math:`L` smaller than the local mesh size has essentially no effect (the field is already band-limited by the discretisation). A useful default for *light* de-noising is :math:`L \approx 1\!-\!2\,h`, where :math:`h` is a representative cell size. Units ----- The setter accepts a plain number (assumed already non-dimensional), a pint ``Quantity`` with length units, or any unit-aware object understood by :func:`underworld3.non_dimensionalise`. Internally the squared non-dimensional value is stored in ``self._smoothing`` (so ``smoothing`` and ``smoothing_length`` stay consistent). The getter returns a Pint ``Quantity`` with length units when a scaling context is configured; otherwise the plain non-dimensional float :math:`\sqrt{\alpha}`. """ import sympy s = self._smoothing try: sval = float(s) except (TypeError, ValueError): return sympy.sqrt(s) if sval < 0: return None L_nd = sval ** 0.5 # Re-dimensionalise to length units if a scaling context # is set; fall back to the plain ND float otherwise. try: return uw.scaling.dimensionalise( L_nd, uw.scaling.units.meter) except Exception: return L_nd @smoothing_length.setter def smoothing_length(self, L): """Set the smoothing length scale. Accepts a Pint Quantity (with length units), a UnitAware scalar, or a plain non-dimensional number. The value is non-dimensionalised through the active scaling context before being squared and stored as ``self._smoothing``. """ self._needs_function_rewire = True # Unit-aware: route through non_dimensionalise so the # caller can pass `2.0 * uw.scaling.units.meter` or a # plain float interchangeably. try: L_nd = uw.non_dimensionalise(L) except Exception: # Fall back to magnitude-or-float coercion if the # value doesn't carry/expect units. if hasattr(L, "magnitude"): L_nd = L.magnitude else: L_nd = L self._smoothing = sympify(L_nd) ** 2 @property def uw_weighting_function(self): """Weighting function applied during projection.""" return self._uw_weighting_function @uw_weighting_function.setter def uw_weighting_function(self, user_uw_function): """Set the weighting function for the projection.""" self._needs_function_rewire = True self._uw_weighting_function = user_uw_function
## -------------------------------- ## Project from pointwise vector ## functions to nodal point unknowns ## ## We can add boundary constraints in the usual way (parent class handles this) ## We can add smoothing (which takes the form of a viscosity term) ## We could consider an incompressibility constraint ... or remove null spaces ## --------------------------------
[docs] class SNES_Vector_Projection(SNES_Vector): r""" Vector projection solver for mapping vector functions to mesh variables. Solves :math:`\mathbf{u} = \tilde{\mathbf{f}}` where :math:`\tilde{\mathbf{f}}` is a vector function that can be evaluated within an element and :math:`\mathbf{u}` is a vector mesh variable with associated shape functions. Typically used to obtain a continuous representation of a vector function not well-defined at mesh nodes (e.g., gradient or flux vectors), or as a length-scale-aware smoother of an existing vector field. Strong form (screened-Poisson, vector-valued) --------------------------------------------- .. math:: \mathbf{u} \;-\; \nabla \cdot \left( \alpha\, \nabla \mathbf{u} \right) \;+\; \lambda \left( \nabla \cdot \mathbf{u} \right) \mathbf{I} \;=\; \tilde{\mathbf{f}} . The :math:`\alpha`-term is the same screened-Poisson smoother as in :class:`SNES_Projection`, applied component-wise: it has the same :math:`L = \sqrt{\alpha}` smoothing-length interpretation and the same :math:`\exp(-r/L)` Green's function. The extra :math:`\lambda` term is a divergence penalty (see :attr:`penalty`) — set it nonzero when you want an approximately solenoidal projection of :math:`\tilde{\mathbf{f}}`. Setting :math:`\alpha = 0` (and :math:`\lambda = 0`) gives a pure pointwise L2 projection. See :class:`SNES_Projection` for the full mathematical context; the relationship between :attr:`smoothing` (units length²) and :attr:`smoothing_length` (units length) is the same as for the scalar projection. Parameters ---------- mesh : Mesh The computational mesh. u_Field : MeshVariable, optional Target vector mesh variable for the projection. degree : int, default=2 Polynomial degree for the finite element space. verbose : bool, default=False Enable verbose output. See Also -------- SNES_Projection : Scalar field projection (full mathematical detail). SNES_Tensor_Projection : Tensor field projection. """
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, u_Field: uw.discretisation.MeshVariable = None, degree=2, verbose=False, ): super().__init__( mesh, u_Field, degree, verbose, ) self._needs_function_rewire = True self._smoothing = 0.0 self._penalty = 0.0 self._uw_weighting_function = 1.0 self._uw_function = sympy.Matrix([[0] * self.mesh.dim]) # Default: project zero vector self._constitutive_model = uw.constitutive_models.Constitutive_Model(self.Unknowns) return
# Use Template for persistent read-only template expressions F0 = Template( r"f_0 \left( \mathbf{u} \right)", lambda self: (self.u.sym - self.uw_function) * self.uw_weighting_function, "Vector projection pointwise misfit term: f_0(u)", ) F1 = Template( r"\mathbf{F}_1\left( \mathbf{u} \right)", lambda self: ( self.smoothing * self.Unknowns.E + self.penalty * self.mesh.vector.divergence(self.u.sym) * sympy.eye(self.mesh.dim) ), "Vector projection pointwise smoothing term: F_1(u)", )
[docs] @timing.routine_timer_decorator def projection_problem_description(self): """Build residual terms for vector projection FEM assembly.""" # residual terms - defines the problem: # solve for a best fit to the continuous mesh # variable given the values in self.function # F0 is left in place for the user to inject # non-linear constraints if required self._f0 = self.F0.sym # F1 is left in the users control ... e.g to add other gradient constraints to the stiffness matrix self._f1 = ( self.F1.sym + self.smoothing * self.Unknowns.E + self.penalty * self.mesh.vector.divergence(self.u.sym) * sympy.eye(self.mesh.dim) ) return
# Use SymbolicProperty for automatic unwrapping uw_function = SymbolicProperty(matrix_wrap=True, doc="Vector function to project onto mesh") @property def smoothing(self): r"""Smoothing coefficient :math:`\alpha` (units **length²**). Coefficient of the :math:`\nabla\!\cdot\!(\alpha\,\nabla \mathbf u)` term in the vector screened-Poisson equation (see the class docstring). Acts component-wise; the smoothing length is :math:`L = \sqrt{\alpha}` and the Green's function decays as :math:`\exp(-r/L)`. Use :attr:`smoothing_length` for the L-valued, unit-aware knob. See :attr:`SNES_Projection.smoothing` for the full derivation and usage patterns. """ return self._smoothing @smoothing.setter def smoothing(self, smoothing_factor): """Set the smoothing regularization parameter.""" self._needs_function_rewire = True self._smoothing = sympify(smoothing_factor) @property def smoothing_length(self): r"""Smoothing length :math:`L` (length units, **unit-aware**). L-valued view on :attr:`smoothing`, with :math:`L = \sqrt{\alpha}`. The projection then acts as a component-wise screened-Poisson smoother :math:`\mathbf u - L^{2}\,\nabla^{2}\mathbf u = \tilde{\mathbf f}`, i.e. a Gaussian-like convolution of width :math:`L` applied to each Cartesian component of the input vector field. Choose :math:`L \gtrsim h` (a cell size) for noticeable smoothing; :math:`L < h` does essentially nothing because the discretisation already band-limits at that scale. See :attr:`SNES_Projection.smoothing_length` for the full mathematical and units discussion. """ import sympy s = self._smoothing try: sval = float(s) except (TypeError, ValueError): return sympy.sqrt(s) if sval < 0: return None L_nd = sval ** 0.5 try: return uw.scaling.dimensionalise( L_nd, uw.scaling.units.meter) except Exception: return L_nd @smoothing_length.setter def smoothing_length(self, L): """Set the smoothing length scale (unit-aware).""" self._needs_function_rewire = True try: L_nd = uw.non_dimensionalise(L) except Exception: if hasattr(L, "magnitude"): L_nd = L.magnitude else: L_nd = L self._smoothing = sympify(L_nd) ** 2 @property def penalty(self): r"""Divergence penalty :math:`\lambda` for (approx.) incompressibility. Coefficient of the :math:`\lambda (\nabla\!\cdot\!\mathbf u) \mathbf I` term in the vector projection. Large positive values bias the projection toward :math:`\nabla\!\cdot\!\mathbf u = 0`, i.e. a solenoidal approximation of :math:`\tilde{\mathbf f}`. Has no length interpretation — unlike :attr:`smoothing` it does not introduce a filter scale. """ return self._penalty @penalty.setter def penalty(self, value): """Set the divergence penalty parameter.""" self._needs_function_rewire = True symval = sympify(value) self._penalty = symval @property def uw_weighting_function(self): """Weighting function applied during projection.""" return self._uw_weighting_function @uw_weighting_function.setter def uw_weighting_function(self, user_uw_function): """Set the weighting function for the projection.""" self._needs_function_rewire = True self._uw_weighting_function = user_uw_function
class SNES_Tensor_Projection(SNES_Projection): r""" Tensor projection solver for mapping tensor functions to mesh variables. Solves :math:`\mathbf{u} = \tilde{\mathbf{f}}` where :math:`\tilde{\mathbf{f}}` is a tensor-valued function that can be evaluated within an element and :math:`\mathbf{u}` is a tensor mesh variable with associated shape functions. Typically used to obtain a continuous representation of a tensor function not well-defined at mesh nodes (e.g., stress or strain tensors), with optional length-scale smoothing. Strong form (screened-Poisson, applied component-wise) ------------------------------------------------------ Internally the solve is decomposed into scalar sub-problems, one per tensor component :math:`u_{ij}`; each sub-problem is .. math:: u_{ij} \;-\; \nabla \cdot \left( \alpha\, \nabla u_{ij} \right) \;=\; \tilde{f}_{ij}, identical to :class:`SNES_Projection`. The smoothing length is :math:`L = \sqrt{\alpha}` and the Green's function decays as :math:`\exp(-r/L)`; setting :math:`\alpha = 0` gives a pure pointwise L2 projection. See :class:`SNES_Projection` for the full derivation, the choice between :attr:`smoothing` (length²) and :attr:`smoothing_length` (length, unit-aware), and guidance on picking :math:`L` relative to the cell size. Parameters ---------- mesh : Mesh The computational mesh. tensor_Field : MeshVariable, optional Target tensor mesh variable for the projection. scalar_Field : MeshVariable, optional Scalar work variable used internally. degree : int, default=2 Polynomial degree for the finite element space. verbose : bool, default=False Enable verbose output. Notes ----- Currently implemented component-wise as there is no native solver for tensor unknowns. Each component sees the same :math:`\alpha`, so the effective smoothing length :math:`L` is uniform across the tensor entries. See Also -------- SNES_Projection : Scalar field projection (full mathematical detail). SNES_Vector_Projection : Vector field projection. """ @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, tensor_Field: uw.discretisation.MeshVariable = None, scalar_Field: uw.discretisation.MeshVariable = None, degree=2, verbose=False, ): self.t_field = tensor_Field super().__init__( mesh=mesh, u_Field=scalar_Field, degree=degree, verbose=verbose, ) return ## Need to over-ride solve method to run over all components @timing.routine_timer_decorator def solve(self, verbose=False, divergence_retries: int = 0): """Solve by projecting each tensor component sequentially. Parameters ---------- verbose : bool divergence_retries : int, default=0 Forwarded to each per-component SNES solve. 0 preserves legacy behaviour. """ # Loop over the components of the tensor. If this is a symmetric # tensor, we'll usually be given the 1d form to prevent duplication # if self.t_field.sym_1d.shape != self.uw_function.shape: # raise ValueError( # "Tensor shapes for uw_function and MeshVariable are not the same" # ) symm = self.t_field.sym.is_symmetric() for i in range(self.uw_function.shape[0]): for j in range(self.uw_function.shape[1]): if symm and j > i: continue self.uw_scalar_function = sympy.Matrix([[self.uw_function[i, j]]]) self.u.array[:, 0, 0] = self.t_field.array[:, i, j] # solve the projection for the scalar sub-problem super().solve(verbose=verbose) self.t_field.array[:, i, j] = self.u.array[:, 0, 0] # That might be all ... # This is re-defined so it uses uw_scalar_function @property def F0(self): """Pointwise misfit term for scalar subproblem.""" f0_val = expression( r"f_0 \left( \mathbf{u} \right)", (self.u.sym - self.uw_scalar_function) * self.uw_weighting_function, "Scalar subproblem of tensor projection: f_0(u)", ) # backward compatibility self._f0 = f0_val return f0_val @property def F1(self): """Pointwise smoothing flux term for scalar subproblem.""" F1_val = expression( r"\mathbf{F}_1\left( \mathbf{u} \right)", self.smoothing * self.mesh.vector.gradient(self.u.sym), "Scalar subproblem of tensor projection (smoothing): F_1(u)", ) # backward compatibility self._f1 = F1_val return F1_val @property def uw_scalar_function(self): """Current scalar component function being projected.""" return self._uw_scalar_function @uw_scalar_function.setter def uw_scalar_function(self, user_uw_function): """Set the scalar component function for current tensor element.""" self._needs_function_rewire = True self._uw_scalar_function = user_uw_function class SNES_MultiComponent_Projection(SNES_MultiComponent): r""" Multi-component projection solver. Projects an N-component row-matrix expression onto an N-component mesh variable in a **single** SNES solve, sharing one DM across all components. Replaces the per-component cycling used by :class:`SNES_Tensor_Projection`, which tears down and rebuilds the PETSc DM on every inner iteration. Strong form (block-diagonal screened Poisson) --------------------------------------------- There is no cross-component coupling, so each component :math:`u_k,\ k=1,\dots,N_c` satisfies the same scalar screened-Poisson equation as :class:`SNES_Projection`, .. math:: u_k \;-\; \nabla \cdot \left( \alpha\, \nabla u_k \right) \;=\; \tilde f_k. Setting :math:`\alpha = 0` gives a pure pointwise L2 projection per component. The smoothing length is :math:`L = \sqrt{\alpha}` and the Green's function decays as :math:`\exp(-r/L)` — the same Gaussian-like convolution interpretation as the scalar case. All components share a single :math:`\alpha`, so the smoothing scale :math:`L` is uniform across the multi-component target. Use :attr:`smoothing` to set :math:`\alpha` (units length²) or :attr:`smoothing_length` for the L-valued, unit-aware knob. See :class:`SNES_Projection` for the full derivation and guidance. Parameters ---------- mesh : Mesh u_Field : MeshVariable, optional Target ``(1, n_components)`` MATRIX variable. If ``None``, one is created. n_components : int, optional Number of scalar components; required if ``u_Field`` is ``None``. degree : int, default=2 verbose : bool, default=False See Also -------- SNES_Projection : Scalar projection (Nc=1) — full mathematical detail. SNES_Tensor_Projection : Legacy per-component cycling projector. """ @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, u_Field: uw.discretisation.MeshVariable = None, n_components: int = None, degree=2, verbose=False, ): super().__init__( mesh, u_Field=u_Field, n_components=n_components, degree=degree, verbose=verbose, ) self._needs_function_rewire = True self._smoothing = sympify(0) self._uw_weighting_function = sympify(1) self._uw_function = sympy.zeros(1, self._n_components) self._constitutive_model = uw.constitutive_models.Constitutive_Model(self.Unknowns) F0 = Template( r"f_0 \left( \mathbf{u} \right)", lambda self: (self.u.sym - self.uw_function) * self.uw_weighting_function, "Per-component projection misfit (row matrix shape (1, Nc)).", ) F1 = Template( r"\mathbf{F}_1 \left( \mathbf{u} \right)", lambda self: self.smoothing * self.Unknowns.L, "Per-component block-diagonal Laplacian smoothing (shape (Nc, dim)).", ) uw_function = SymbolicProperty( matrix_wrap=True, doc="Row matrix (1, n_components) of expressions to project.", ) @property def smoothing(self): r"""Smoothing coefficient :math:`\alpha` (units **length²**). Coefficient of the :math:`\nabla\!\cdot\!(\alpha\,\nabla u_k)` term in each component's screened-Poisson sub-problem (see the class docstring). One :math:`\alpha` is shared across all :math:`N_c` components, so the implied smoothing length :math:`L = \sqrt{\alpha}` is uniform. Use :attr:`smoothing_length` for the L-valued, unit-aware knob. See :attr:`SNES_Projection.smoothing` for the full derivation and the Gaussian-like convolution interpretation. """ return self._smoothing @smoothing.setter def smoothing(self, value): self._needs_function_rewire = True self._smoothing = sympify(value) @property def smoothing_length(self): r"""Smoothing length :math:`L` (length units, **unit-aware**). L-valued view on :attr:`smoothing`, with :math:`L = \sqrt{\alpha}`. Each component then satisfies :math:`u_k - L^{2}\,\nabla^{2} u_k = \tilde f_k`, i.e. a Gaussian-like convolution of width :math:`L` applied independently to each component of the multi-component target. See :attr:`SNES_Projection.smoothing_length` for the full mathematical and units discussion. """ import sympy s = self._smoothing try: sval = float(s) except (TypeError, ValueError): return sympy.sqrt(s) if sval < 0: return None L_nd = sval ** 0.5 try: return uw.scaling.dimensionalise( L_nd, uw.scaling.units.meter) except Exception: return L_nd @smoothing_length.setter def smoothing_length(self, L): """Set the smoothing length scale (unit-aware).""" self._needs_function_rewire = True try: L_nd = uw.non_dimensionalise(L) except Exception: if hasattr(L, "magnitude"): L_nd = L.magnitude else: L_nd = L self._smoothing = sympify(L_nd) ** 2 @property def uw_weighting_function(self): """Weighting function applied during projection.""" return self._uw_weighting_function @uw_weighting_function.setter def uw_weighting_function(self, value): self._needs_function_rewire = True self._uw_weighting_function = value # ################################################# # # Swarm-based advection-diffusion # # solver based on SNES_Poisson and swarm-variable # # projection # # # #################################################
[docs] class SNES_AdvectionDiffusion(SNES_Scalar): r""" Advection-diffusion equation solver using semi-Lagrangian Crank-Nicolson. Implements the characteristics-based method described in Spiegelman & Katz (2006): .. math:: \underbrace{\frac{\partial u}{\partial t} + \left( \mathbf{v} \cdot \nabla \right) u}_{\dot{u}} - \nabla \cdot \underbrace{\left[ \boldsymbol{\kappa} \nabla u \right]}_{\mathbf{F}} = \underbrace{f}_{\mathbf{h}} The flux term :math:`\mathbf{F}` relates diffusive fluxes to gradients in :math:`u`. Advective fluxes along the velocity field :math:`\mathbf{v}` are handled in the :math:`\dot{u}` term. The time derivative :math:`\dot{u}` involves upstream sampling to find :math:`u^*`, the value of :math:`u` at points which later arrive at mesh nodes. This is achieved using a hidden swarm variable advected backwards from nodal points automatically during solve. Parameters ---------- mesh : Mesh The computational mesh. u_Field : MeshVariable Mesh variable for the transported scalar. V_fn : MeshVariable or sympy.Basic Velocity field for advection. order : int, default=1 Time integration order (1 or 2). restore_points_func : callable, optional Function to restore particles to valid domain. verbose : bool, default=False Enable verbose output. DuDt : SemiLagrangian_DDt or Lagrangian_DDt, optional Time derivative operator for the unknown. DFDt : SemiLagrangian_DDt or Lagrangian_DDt, optional Time derivative operator for the flux. monotone_mode : str or None, optional Monotonicity limiter for the semi-Lagrangian trace-back. Forwarded to the internally-constructed ``SemiLagrangian_DDt`` instances for ``DuDt`` and ``DFDt``. - ``None`` (default): pure FE trace-back. Can overshoot at non-nodal upstream points in cells with sharp gradients (e.g. thin boundary layers in deformed cells); legacy behaviour. - ``"clamp"``: clip the FE trace-back result to ``[nbr_min, nbr_max]`` of the ``k = dim + 1`` nearest ``psi_star`` DOFs. Bit-identical to pure FE in smooth regions; bounds overshoots where the FE interpolant would otherwise leave the local data range. - ``"pick"``: keep the FE result where in-bounds; for out-of-bounds DOFs only, re-evaluate via Shepard's-method RBF. More conservative than clamp at the catastrophe edge. When a user supplies a pre-built ``DuDt``, this kwarg is applied to the internally-constructed ``DFDt`` only — the user's ``DuDt`` already encodes whatever ``monotone_mode`` it was constructed with. theta : float, default=0.5 Adams-Moulton theta for the diffusive flux at order 1. Forwarded to the internally-constructed ``SemiLagrangian_DDt`` instances (same forwarding rule as ``monotone_mode``). - ``0.5`` (default): Crank-Nicolson, A-stable but not L-stable. Second-order accurate. Rings on stiff modes with sharp gradients (negative amplification factor). - ``1.0``: Backward Euler, L-stable, monotone for the diffusive flux. First-order accurate. Recommended when SLCN+CN ringing dominates the discretisation error. - ``0.0``: Forward Euler — unstable for stiff diffusion; included for completeness. Notes ----- - The diffusivity :math:`\kappa` is set via the ``constitutive_model`` property - Sources :math:`f` can be any sympy expression involving mesh/swarm variables References ---------- Spiegelman, M., & Katz, R. F. (2006). A semi-Lagrangian Crank-Nicolson algorithm for the numerical solution of advection-diffusion problems. *Geochemistry, Geophysics, Geosystems*, 7(4). https://doi.org/10.1029/2005GC001073 See Also -------- SNES_Diffusion : Pure diffusion solver without advection. SNES_Navier_Stokes : Full momentum advection-diffusion. """ def _object_viewer(self): from IPython.display import Latex, Markdown, display super()._object_viewer() ## feedback on this instance display(Latex(r"$\quad\mathrm{u} = $ " + self.u.sym._repr_latex_())) display(Latex(r"$\quad\mathbf{v} = $ " + self._V_fn._repr_latex_())) display(Latex(r"$\quad\Delta t = $ " + self.delta_t._repr_latex_()))
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, u_Field: uw.discretisation.MeshVariable, V_fn: Union[uw.discretisation.MeshVariable, sympy.Basic], # Should be a sympy function order: int = 1, restore_points_func: Callable = None, verbose=False, DuDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None, DFDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None, monotone_mode: Optional[str] = None, theta: float = 0.5, ): ## Parent class will set up default values etc super().__init__( mesh, u_Field, u_Field.degree, verbose, DuDt=DuDt, DFDt=DFDt, ) if isinstance(V_fn, uw.discretisation.MeshVariable): self._V_fn = V_fn.sym else: self._V_fn = V_fn # default values for properties self.f = sympy.Matrix.zeros(1, 1) self._constitutive_model = None # These are unique to the advection solver self._delta_t = expression(R"\Delta t", 0, "Physically motivated timestep") self._needs_function_rewire = True self.restore_points_to_domain_func = restore_points_func ### Setup the history terms ... This version should not build anything ### by default - it's the template / skeleton ## NB - Smoothing is generally required for stability. 0.0001 is effective ## at the various resolutions tested. if DuDt is None: self.Unknowns.DuDt = SemiLagrangian_DDt( self.mesh, u_Field.sym, # Symbolic expression - SemiLagrangian evaluates this at each update self._V_fn, vtype=uw.VarType.SCALAR, degree=u_Field.degree, continuous=u_Field.continuous, varsymbol=u_Field.symbol, verbose=verbose, bcs=self.essential_bcs, order=1, smoothing=0.0, monotone_mode=monotone_mode, theta=theta, ) else: # validation if order is None: order = DuDt.order else: if DuDt.order < order: raise RuntimeError( f"DuDt supplied is order {DuDt.order} but order requested is {order}" ) self.Unknowns.DuDt = DuDt self.Unknowns.DFDt = SemiLagrangian_DDt( self.mesh, sympy.Matrix([[0] * self.mesh.dim]), # Actual function is not defined at this point self._V_fn, vtype=uw.VarType.VECTOR, degree=u_Field.degree, continuous=True, # The default is now to match the above and avoid # any use of projection. # swarm_degree=u_Field.degree - 1, # swarm_continuous=False, varsymbol=rf"{{F[ {self.u.symbol} ] }}", verbose=verbose, bcs=None, order=order, smoothing=0.0, monotone_mode=monotone_mode, theta=theta, ) return
@property def F0(self): """Pointwise source term including time derivative.""" f0 = expression( r"f_0 \left( \mathbf{u} \right)", -self.f + self.DuDt.bdf(0) / self.delta_t, "Poisson pointwise force term: f_0(u)", ) # backward compatibility self._f0 = f0 return f0 @property def F1(self): """Pointwise diffusive flux term (Adams-Moulton integration).""" F1_val = expression( r"\mathbf{F}_1\left( \mathbf{u} \right)", self.DFDt.adams_moulton_flux(), "Poisson pointwise flux term: F_1(u)", ) # backward compatibility self._f1 = F1_val return F1_val
[docs] def adv_diff_slcn_problem_description(self): """Build residual terms for advection-diffusion FEM assembly.""" # f0 residual term self._f0 = self.F0.sym # f1 residual term self._f1 = self.F1.sym return
@property def f(self): r"""Source term for the advection-diffusion equation. The source :math:`f` appears on the right-hand side: .. math:: \frac{\partial u}{\partial t} + \mathbf{V} \cdot \nabla u = \nabla \cdot (\kappa \nabla u) + f Returns ------- sympy.Matrix Source term expression. """ return self._f @f.setter def f(self, value): """Set the volumetric source term.""" self._needs_function_rewire = True self._f = sympy.Matrix((value,)) @property def V_fn(self): r"""Velocity field for advection. The advection velocity :math:`\mathbf{V}` transports the scalar field :math:`u`. Can be a MeshVariable or symbolic expression. Returns ------- sympy.Matrix Velocity vector expression. """ return self._V_fn @V_fn.setter def V_fn(self, value): """ Set the velocity function for advection. Parameters: ----------- value : uw.discretisation.MeshVariable or sympy.Basic Velocity field as either a MeshVariable or sympy expression """ self.is_setup = False # Mark as needing setup when velocity changes if isinstance(value, uw.discretisation.MeshVariable): self._V_fn = value.sym else: self._V_fn = value @property def delta_t(self): r"""Timestep for time integration. The timestep :math:`\Delta t` controls the temporal discretization. For explicit advection, this should satisfy the CFL condition: .. math:: \Delta t < \frac{h}{|\mathbf{V}|} where :math:`h` is the element size and :math:`|\mathbf{V}|` is the velocity magnitude. Returns ------- UWexpression Timestep value. See Also -------- estimate_dt : Computes a stable timestep automatically. """ return self._delta_t @delta_t.setter def delta_t(self, value): """Set the timestep (handles unit conversion if provided).""" # Note: comparison must handle potential UWexpressions / UWQuantities # Use .data or float() to get numeric values for stable comparison try: old_dt = float(self._delta_t.data) new_dt = float(value.data) if hasattr(value, 'data') else float(value) if np.isclose(old_dt, new_dt, rtol=1e-12, atol=1e-15): return except: pass self._needs_function_rewire = True # Handle Pint Quantities with time dimensions if hasattr(value, "dimensionality"): # This is a Pint Quantity - check if it has time dimensions try: from ..scaling import units as ureg time_dim = ureg.second.dimensionality if value.dimensionality == time_dim: # Convert physical time to nondimensional using model time scale model = uw.get_default_model() if model and hasattr(model, "fundamental_scales") and model.fundamental_scales: time_scale = model.fundamental_scales.get("time") if time_scale is not None: # Physical time / time scale = nondimensional time # Must use to_reduced_units() to convert both quantities # to same base units before extracting magnitude. # Otherwise Pint keeps different unit bases (megayear/second) # and .magnitude returns the unconverted number! result = value / time_scale # Get the internal Pint quantity for proper unit conversion if hasattr(result, "_pint_qty"): # UWQuantity - access internal Pint quantity pint_result = result._pint_qty.to_reduced_units() elif hasattr(result, "to_reduced_units"): # Raw Pint Quantity pint_result = result.to_reduced_units() else: pint_result = result # Extract the dimensionless magnitude if hasattr(pint_result, "magnitude"): value = float(pint_result.magnitude) else: value = float(pint_result) except Exception: pass # If anything fails, try to use value as-is self._delta_t.sym = value
[docs] @timing.routine_timer_decorator def estimate_dt(self, direction_aware: bool = False): r""" Estimate an appropriate timestep for the advection-diffusion solver. This is an implicit solver so the returned :math:`\delta t` is the minimum of: - :math:`\delta t_{\textrm{diff}}`: typical time for diffusion across an element - :math:`\delta t_{\textrm{adv}}`: typical element-crossing time for a fluid parcel Parameters ---------- direction_aware : bool, default False If True, the advective dt uses the per-cell extent *along the local velocity direction* — `h_eff_c = max_i(s_i) - min_i(s_i)` where `s_i = (x_i - centroid) · v̂` over the cell vertices. This is the distance material actually traverses through the cell per unit |v|, and is **always ≥ the isotropic mesh._radii estimate**, by 1.5–3× for equant cells (geometric factor) and up to ~10× for cells that the mover has stretched along the flow direction. On adapted meshes the gain is substantial; on uniform meshes it's the geometric factor only. Off by default to preserve historical behaviour; safe to enable everywhere once validated. Returns ------- pint.Quantity or float The recommended timestep with physical time units if a model with reference scales is available, otherwise nondimensional. """ ### required modules from mpi4py import MPI # Use the unified .K property from the constitutive model # This provides diffusivity for diffusion models K = self.constitutive_model.K # Evaluate the diffusivity (handles constant and spatially-varying cases) if isinstance(K, sympy.Expr) or hasattr(K, 'sym'): K_sym = K.sym if hasattr(K, 'sym') else K if uw.function.fn_is_constant_expr(K_sym): diffusivity = uw.function.evaluate( K_sym, np.zeros((1, self.mesh.dim)), ) else: diffusivity = uw.function.evaluate( sympy.sympify(K_sym), self.mesh._centroids, self.mesh.N, ) diffusivity = diffusivity.max() else: diffusivity = K # If diffusivity is unit-aware (UnitAwareArray), nondimensionalise it to get # consistent nondimensional values that match mesh._radii # Note: .magnitude returns physical units, which would be wrong here if hasattr(diffusivity, "units") and diffusivity.units is not None: diffusivity = uw.non_dimensionalise(diffusivity) elif hasattr(diffusivity, "magnitude"): # Plain UWQuantity without units context - use magnitude diffusivity = diffusivity.magnitude # Ensure diffusivity is a plain numpy scalar max_diffusivity = float(np.asarray(diffusivity).max()) ## get global max diffusivity value comm = uw.mpi.comm diffusivity_glob = comm.allreduce(max_diffusivity, op=MPI.MAX) ### get the velocity values at element centroids (nondimensional) vel = uw.function.evaluate( self.V_fn, self.mesh._centroids, ) # If vel is unit-aware (UnitAwareArray), nondimensionalise it to get # consistent nondimensional values that match mesh._radii # Note: .magnitude returns physical units, which would be wrong here if hasattr(vel, "units") and vel.units is not None: vel = uw.non_dimensionalise(vel) elif hasattr(vel, "magnitude"): # Plain UWQuantity without units context - use magnitude vel = vel.magnitude # Ensure vel is a plain numpy array vel = np.asarray(vel) # Squeeze out any singleton dimensions from evaluate (shape: N,1,dim -> N,dim) vel = np.squeeze(vel) # Handle edge case of single element (ensure 2D) if vel.ndim == 1: vel = vel.reshape(1, -1) # Get per-element velocity magnitudes vel_magnitudes = np.linalg.norm(vel, axis=1) # Get per-element radii (characteristic element size) element_radii = self.mesh._radii ## estimate dt of adv and diff components using per-element approach ## dt_adv_i = h_i / |v_i| for advection ## dt_diff_i = h_i^2 / κ for diffusion (using global κ for now) # Per-element diffusive timestep (all elements use same diffusivity) if diffusivity_glob > 0: dt_diff_per_element = (element_radii ** 2) / diffusivity_glob min_dt_diff_local = np.min(dt_diff_per_element) if len(dt_diff_per_element) > 0 else np.inf else: min_dt_diff_local = np.inf # Per-element advective timestep — either isotropic # (mesh._radii / |v|) or direction-aware (v-aligned cell # extent / |v|). if direction_aware: # Per-cell vertex indices (triangle / tet). from underworld3.meshing.smoothing import _tri_cells tris = _tri_cells(self.mesh.dm) if tris is None: # Fall back to isotropic for non-triangle meshes. h_per_element = element_radii else: coords = np.asarray(self.mesh.X.coords) centroids = coords[tris].mean(axis=1) # v-hat per cell (use centroid v we already have) vhat = np.where( vel_magnitudes[:, None] > 0, vel / np.maximum(vel_magnitudes[:, None], 1.0e-30), 0.0) D = coords[tris] - centroids[:, None, :] # Signed projections along v̂ per cell vertex s = np.einsum('cvd,cd->cv', D, vhat) h_per_element = s.max(axis=1) - s.min(axis=1) # Sanity-floor — for zero-velocity cells s=0 # ⇒ h_eff=0 ⇒ dt_adv=inf via the where below h_per_element = np.maximum( h_per_element, 0.0) else: h_per_element = element_radii with np.errstate(divide='ignore', invalid='ignore'): dt_adv_per_element = np.where( vel_magnitudes > 0, h_per_element / vel_magnitudes, np.inf ) min_dt_adv_local = np.min(dt_adv_per_element) if len(dt_adv_per_element) > 0 else np.inf # Get global minimum timesteps (parallel-safe) min_dt_diff_glob = comm.allreduce(min_dt_diff_local, op=MPI.MIN) min_dt_adv_glob = comm.allreduce(min_dt_adv_local, op=MPI.MIN) # Store for user inspection self.dt_adv = min_dt_adv_glob if not np.isinf(min_dt_adv_glob) else 0.0 self.dt_diff = min_dt_diff_glob if not np.isinf(min_dt_diff_glob) else 0.0 # Take overall minimum (respecting infinity for zero velocity/diffusivity cases) dt_estimate = min(min_dt_diff_glob, min_dt_adv_glob) # If both are infinite (no velocity and no diffusivity), return infinity if np.isinf(dt_estimate): return np.inf # Dimensionalise the result to physical time try: return uw.dimensionalise(np.squeeze(dt_estimate), {'[time]': 1}) except Exception: # Fallback: return plain nondimensional number return np.squeeze(dt_estimate)
[docs] @timing.routine_timer_decorator def solve( self, zero_init_guess: bool = True, timestep: float = None, _force_setup: bool = False, _evalf=False, verbose=False, divergence_retries: int = 0, ): """ Generates solution to constructed system. Params ------ zero_init_guess: If `True`, a zero initial guess will be used for the system solution. Otherwise, the current values of `self.u` will be used. divergence_retries: If SNES reports DIVERGED, retry with warm start up to this many times. 0 preserves legacy behaviour. """ if timestep is not None and timestep != self.delta_t: self.delta_t = timestep # this will force an initialisation because the functions need to be updated if _force_setup: self._needs_function_rewire = True if not self.constitutive_model._solver_is_setup: self._needs_function_rewire = True self.DFDt.psi_fn = self.constitutive_model.flux.T if not self.is_setup: self._setup_pointwise_functions(verbose) self._setup_discretisation(verbose) self._setup_solver(verbose) # Update History / Flux History terms # SemiLagrange and Lagrange may have different sequencing. self.DuDt.update_pre_solve(timestep, verbose=verbose, evalf=_evalf) self.DFDt.update_pre_solve(timestep, verbose=verbose, evalf=_evalf) super().solve(zero_init_guess, _force_setup, divergence_retries=divergence_retries) # Invalidate cached data views - PETSc may have replaced underlying buffers # This ensures .data and .array properties return fresh data from PETSc # Handle both EnhancedMeshVariable (has _base_var) and direct _MeshVariable target_var = getattr(self.u, "_base_var", self.u) if hasattr(target_var, "_canonical_data"): target_var._canonical_data = None self.DuDt.update_post_solve(timestep, verbose=verbose, evalf=_evalf) self.DFDt.update_post_solve(timestep, verbose=verbose, evalf=_evalf) self.is_setup = True self.constitutive_model._solver_is_setup = True return
[docs] class SNES_Diffusion(SNES_Scalar): r""" Diffusion equation solver using mesh-based finite elements. Solves the scalar diffusion equation: .. math:: \underbrace{\frac{\partial u}{\partial t}}_{\dot{f}} - \nabla \cdot \underbrace{\left[ \boldsymbol{\kappa} \nabla u \right]}_{\mathbf{F}} = \underbrace{f}_{h} The flux term :math:`\mathbf{F}` relates diffusive fluxes to gradients in the unknown :math:`u`. Parameters ---------- mesh : Mesh The computational mesh. u_Field : MeshVariable Mesh variable for the diffusing scalar. order : int, default=1 Time integration order. theta : float, default=0.0 Time integration parameter (0=explicit, 0.5=Crank-Nicolson, 1=implicit). evalf : bool, default=False Numerically evaluate symbolic expressions during setup. verbose : bool, default=False Enable verbose output. DuDt : Eulerian_DDt, SemiLagrangian_DDt, or Lagrangian_DDt, optional Time derivative operator for the unknown. DFDt : Eulerian_DDt, SemiLagrangian_DDt, or Lagrangian_DDt, optional Time derivative operator for the flux. Notes ----- - The diffusivity :math:`\kappa` is set via the ``constitutive_model`` property - Sources :math:`f` can be any sympy expression involving mesh/swarm variables See Also -------- SNES_AdvectionDiffusion : Adds advection transport. SNES_Poisson : Steady-state diffusion (no time derivative). """ def _object_viewer(self): from IPython.display import Latex, Markdown, display super()._object_viewer() ## feedback on this instance display(Latex(r"$\quad\mathrm{u} = $ " + self.u.sym._repr_latex_())) display(Latex(r"$\quad\Delta t = $ " + self.delta_t._repr_latex_()))
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, u_Field: uw.discretisation.MeshVariable, order: int = 1, theta: float = 0.0, evalf: Optional[bool] = False, verbose=False, DuDt: Union[Eulerian_DDt, SemiLagrangian_DDt, Lagrangian_DDt] = None, DFDt: Union[Eulerian_DDt, SemiLagrangian_DDt, Lagrangian_DDt] = None, ): ## Parent class will set up default values etc super().__init__( mesh, u_Field, u_Field.degree, verbose, DuDt=DuDt, DFDt=DFDt, ) # default values for properties self.f = sympy.Matrix.zeros(1, 1) self._constitutive_model = None self.theta = theta # These are unique to the advection solver self._delta_t = expression(R"\Delta t", 0, "Physically motivated timestep") self._needs_function_rewire = True ### Setup the history terms ... This version should not build anything ### by default - it's the template / skeleton ## NB - Smoothing is generally required for stability. 0.0001 is effective ## at the various resolutions tested. if DuDt is None: self.Unknowns.DuDt = Eulerian_DDt( self.mesh, u_Field, vtype=uw.VarType.SCALAR, degree=u_Field.degree, continuous=u_Field.continuous, varsymbol=u_Field.symbol, verbose=verbose, evalf=evalf, bcs=self.essential_bcs, order=order, smoothing=0.0, ) else: # validation if order is None: order = DuDt.order else: if DuDt.order < order: raise RuntimeError( f"DuDt supplied is order {DuDt.order} but order requested is {order}" ) self.Unknowns.DuDt = DuDt if DFDt is None: self.Unknowns.DFDt = Symbolic_DDt( sympy.Matrix([[0] * self.mesh.dim]), varsymbol=rf"{{F[ {self.u.symbol} ] }}", theta=theta, bcs=None, order=order, ) ### solution unable to solve after n timesteps, due to the projection of flux term (???) # self.Unknowns.DFDt = Eulerian_DDt( # self.mesh, # sympy.Matrix( # [[0] * self.mesh.dim] # ), # Actual function is not defined at this point # vtype=uw.VarType.VECTOR, # degree=u_Field.degree, # continuous=u_Field.continuous, # varsymbol=rf"{{F[ {self.u.symbol} ] }}", # theta=theta, # evalf=evalf, # verbose=verbose, # bcs=None, # order=order, # smoothing=0.0, # ) return
@property def F0(self): """Pointwise source term including time derivative.""" f0 = expression( r"f_0 \left( \mathbf{u} \right)", -self.f + self.DuDt.bdf() / self.delta_t, "Diffusion pointwise force term: f_0(u)", ) # backward compatibility self._f0 = f0 return f0 @property def F1(self): """Pointwise diffusive flux term.""" F1_val = expression( r"\mathbf{F}_1\left( \mathbf{u} \right)", self.DFDt.adams_moulton_flux(), "Diffusion pointwise flux term: F_1(u)", ) # backward compatibility self._f1 = F1_val return F1_val @property def f(self): """Source term for the diffusion equation.""" return self._f @f.setter def f(self, value): """Set the volumetric source term.""" self._needs_function_rewire = True self._f = sympy.Matrix((value,)) @property def delta_t(self): """Timestep for time integration.""" return self._delta_t @delta_t.setter def delta_t(self, value): """Set the timestep (handles unit conversion if provided).""" # Note: comparison must handle potential UWexpressions / UWQuantities # Use .data or float() to get numeric values for stable comparison try: old_dt = float(self._delta_t.data) new_dt = float(value.data) if hasattr(value, 'data') else float(value) if np.isclose(old_dt, new_dt, rtol=1e-12, atol=1e-15): return except: pass self._needs_function_rewire = True # Handle Pint Quantities with time dimensions if hasattr(value, "dimensionality"): # This is a Pint Quantity - check if it has time dimensions try: from ..scaling import units as ureg time_dim = ureg.second.dimensionality if value.dimensionality == time_dim: # Convert physical time to nondimensional using model time scale model = uw.get_default_model() if model and hasattr(model, "fundamental_scales") and model.fundamental_scales: time_scale = model.fundamental_scales.get("time") if time_scale is not None: # Physical time / time scale = nondimensional time # Must use to_reduced_units() to convert both quantities # to same base units before extracting magnitude. # Otherwise Pint keeps different unit bases (megayear/second) # and .magnitude returns the unconverted number! result = value / time_scale # Get the internal Pint quantity for proper unit conversion if hasattr(result, "_pint_qty"): # UWQuantity - access internal Pint quantity pint_result = result._pint_qty.to_reduced_units() elif hasattr(result, "to_reduced_units"): # Raw Pint Quantity pint_result = result.to_reduced_units() else: pint_result = result # Extract the dimensionless magnitude if hasattr(pint_result, "magnitude"): value = float(pint_result.magnitude) else: value = float(pint_result) except Exception: pass # If anything fails, try to use value as-is self._delta_t.sym = value
[docs] @timing.routine_timer_decorator def estimate_dt(self): r""" Estimate an appropriate timestep for the diffusion solver. This solver only has a diffusive component, so the returned :math:`\delta t` is: - :math:`\delta t_{\textrm{diff}}`: typical time for diffusion across an element Returns ------- pint.Quantity or float The diffusive timestep with physical time units if a model with reference scales is available, otherwise nondimensional. """ ### required modules from mpi4py import MPI # Use the unified .K property from the constitutive model # This provides diffusivity for diffusion models K = self.constitutive_model.K # Evaluate the diffusivity (handles constant and spatially-varying cases) if isinstance(K, sympy.Expr) or hasattr(K, 'sym'): K_sym = K.sym if hasattr(K, 'sym') else K if uw.function.fn_is_constant_expr(K_sym): diffusivity = uw.function.evaluate( K_sym, np.zeros((1, self.mesh.dim)), ) else: diffusivity = uw.function.evaluate( sympy.sympify(K_sym), self.mesh._centroids, self.mesh.N, ) diffusivity = diffusivity.max() else: diffusivity = K # If diffusivity is unit-aware (UnitAwareArray), nondimensionalise it to get # consistent nondimensional values that match mesh._radii # Note: .magnitude returns physical units, which would be wrong here if hasattr(diffusivity, "units") and diffusivity.units is not None: diffusivity = uw.non_dimensionalise(diffusivity) elif hasattr(diffusivity, "magnitude"): # Plain UWQuantity without units context - use magnitude diffusivity = diffusivity.magnitude # Ensure diffusivity is a plain numpy scalar diffusivity = float(np.asarray(diffusivity).max()) ## get global max diffusivity value comm = uw.mpi.comm diffusivity_glob = comm.allreduce(diffusivity, op=MPI.MAX) ## get mesh spacing (nondimensional, consistent with diffusivity) min_dx = self.mesh.get_min_radius() ## estimate dt of diff component (all in nondimensional units) self.dt_diff = 0.0 if diffusivity_glob != 0.0: dt_diff = (min_dx**2) / diffusivity_glob self.dt_diff = dt_diff else: dt_diff = np.inf dt_estimate = dt_diff # Apply unit-aware scaling if the solver has unit-aware variables return _apply_unit_aware_scaling(np.squeeze(dt_estimate), self.u, self.mesh)
[docs] @timing.routine_timer_decorator def solve( self, zero_init_guess: bool = True, timestep: float = None, evalf: bool = False, _force_setup: bool = False, verbose=False, divergence_retries: int = 0, ): """ Generates solution to constructed system. Params ------ zero_init_guess: If `True`, a zero initial guess will be used for the system solution. Otherwise, the current values of `self.u` will be used. divergence_retries: If SNES reports DIVERGED, retry with warm start up to this many times. 0 preserves legacy behaviour. """ if timestep is not None and timestep != self.delta_t: self.delta_t = timestep # this will force an initialisation because the functions need to be updated if _force_setup: self._needs_function_rewire = True if not self.constitutive_model._solver_is_setup: self._needs_function_rewire = True self.DFDt.psi_fn = self.constitutive_model.flux.T # self._flux = self.constitutive_model.flux.T # self._flux_star = self._flux.copy() if not self.is_setup: self._setup_pointwise_functions(verbose) self._setup_discretisation(verbose) self._setup_solver(verbose) # Update History / Flux History terms # SemiLagrange and Lagrange may have different sequencing. self.DuDt.update_pre_solve(timestep, evalf=evalf, verbose=verbose) self.DFDt.update_pre_solve(timestep, evalf=evalf, verbose=verbose) super().solve(zero_init_guess, _force_setup, divergence_retries=divergence_retries) # Invalidate cached data views - PETSc may have replaced underlying buffers target_var = getattr(self.u, "_base_var", self.u) if hasattr(target_var, "_canonical_data"): target_var._canonical_data = None self.DuDt.update_post_solve(timestep, evalf=evalf, verbose=verbose) self.DFDt.update_post_solve(timestep, evalf=evalf, verbose=verbose) self.is_setup = True self.constitutive_model._solver_is_setup = True return
# This one is already updated to work with the Lagrange D_Dt
[docs] class SNES_NavierStokes(SNES_Stokes_SaddlePt): r""" Navier-Stokes equation solver with momentum advection. Provides a solver for the Navier-Stokes (vector advection-diffusion) equation similar to the Semi-Lagrange Crank-Nicolson method (Spiegelman & Katz, 2006) but using distributed upstream sampling from a swarm variable. .. math:: \underbrace{\frac{\partial \mathbf{u}}{\partial t} + \left( \mathbf{u} \cdot \nabla \right) \mathbf{u}}_{\dot{\mathbf{u}}} - \nabla \cdot \underbrace{\left[ \frac{\boldsymbol{\eta}}{2} \left( \nabla \mathbf{u} + \nabla \mathbf{u}^T \right) - p \mathbf{I} \right]}_{\mathbf{F}} = \underbrace{\mathbf{f}}_{\mathbf{h}} The flux term :math:`\mathbf{F}` relates viscous stresses to velocity gradients. Advective momentum transport is handled in the :math:`\dot{\mathbf{u}}` term. The time derivative :math:`\dot{\mathbf{u}}` involves upstream sampling to find :math:`\mathbf{u}^*`, representing velocity at the start of the timestep. This is achieved using a swarm variable that carries history information along flow paths. Parameters ---------- mesh : Mesh The computational mesh. velocityField : MeshVariable Mesh variable for velocity. pressureField : MeshVariable Mesh variable for pressure. rho : float or sympy.Expr Fluid density. order : int, default=1 Time integration order. theta : float, default=0.5 Time integration parameter. p_continuous : bool, default=True If False, use discontinuous pressure elements. verbose : bool, default=False Enable verbose output. DuDt : SemiLagrangian_DDt or Lagrangian_DDt, optional Time derivative operator for velocity. DFDt : SemiLagrangian_DDt or Lagrangian_DDt, optional Time derivative operator for stress. Notes ----- - The viscosity :math:`\eta` is set via the ``constitutive_model`` property - High-order shape functions (cubic or higher) are recommended for accurate history term interpolation - The user must supply and update the swarm variable representing :math:`\mathbf{u}^*` References ---------- Spiegelman, M., & Katz, R. F. (2006). A semi-Lagrangian Crank-Nicolson algorithm for the numerical solution of advection-diffusion problems. *Geochemistry, Geophysics, Geosystems*, 7(4). https://doi.org/10.1029/2005GC001073 See Also -------- SNES_Stokes : Steady-state Stokes flow (no inertia). SNES_AdvectionDiffusion : Scalar advection-diffusion. """ def _object_viewer(self): from IPython.display import Latex, Markdown, display super()._object_viewer() ## feedback on this instance display(Latex(r"$\quad\mathrm{u} = $ " + self.u.sym._repr_latex_())) display(Latex(r"$\quad\mathbf{p} = $ " + self.p.sym._repr_latex_())) display(Latex(r"$\quad\Delta t = $ " + self.delta_t._repr_latex_())) display(Latex(rf"$\quad\rho = $" + self.rho._repr_latex_()))
[docs] @timing.routine_timer_decorator def __init__( self, mesh: uw.discretisation.Mesh, velocityField: uw.discretisation.MeshVariable, pressureField: uw.discretisation.MeshVariable, rho: Optional[float] = 0.0, restore_points_func: Callable = None, order: Optional[int] = 2, flux_order: Optional[int] = None, p_continuous: Optional[bool] = False, verbose: Optional[bool] = False, DuDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None, DFDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None, ): ## Parent class will set up default values and load u_Field into the solver super().__init__( mesh, velocityField, pressureField, order, p_continuous, verbose, DuDt=DuDt, DFDt=DFDt, ) # These are unique to the advection solver self._delta_t = expression(r"\Delta t", sympy.oo, "Navier-Stokes timestep") self._needs_function_rewire = True self._rho = expression(R"{\uprho}", rho, "Density") self._first_solve = True self._order = order self._flux_order = flux_order # None means follow effective_order self._penalty = expression(R"{\uplambda}", 0, "Incompressibility Penalty") self.restore_points_to_domain_func = restore_points_func self._bodyforce = sympy.Matrix([[0] * self.mesh.dim]).T self._constitutive_model = None # self._E = self.mesh.vector.strain_tensor(self.u.sym) self._Estar = None self._constraints = sympy.Matrix((self.div_u,)) ### sets up DuDt and DFDt ## ._setup_history_terms() # If DuDt is not provided, then we can build a SLCN version if self.Unknowns.DuDt is None: self.Unknowns.DuDt = uw.systems.ddt.SemiLagrangian( self.mesh, self.u.sym, # Symbolic expression - SemiLagrangian evaluates this at each update self.u.sym, vtype=uw.VarType.VECTOR, degree=self.u.degree, continuous=self.u.continuous, varsymbol=self.u.symbol, verbose=self.verbose, bcs=self.essential_bcs, order=self._order, smoothing=0.0001, ) # F (at least for N-S) is a nodal point variable so there is no benefit # to treating it as a swarm variable. We'll define and use our own SL tracker # as we do in the SLCN version. We'll leave the option for an over-ride. # # Maybe u.degree-1. The scalar equivalent seems to show # little benefit from specific choices here other than # discontinuous flux variables amplifying instabilities. self.Unknowns.DFDt = uw.systems.ddt.SemiLagrangian( self.mesh, sympy.Matrix.zeros(self.mesh.dim, self.mesh.dim), self.u.sym, vtype=uw.VarType.SYM_TENSOR, degree=self.u.degree, continuous=self.u.continuous, varsymbol=rf"{{ F[ {self.u.symbol} ] }}", verbose=self.verbose, bcs=None, order=self._order, ) ## Add in the history terms provided ... return
@property def F0(self): """Pointwise momentum source term (body force + inertia).""" DuDt = self.Unknowns.DuDt f0 = expression( r"\mathbf{f}_0\left( \mathbf{u} \right)", -self.bodyforce + self.rho * DuDt.bdf() / self.delta_t, "NStokes pointwise force term: f_0(u)", ) self._u_f0 = f0 return f0 @property def F1(self): """Pointwise stress flux term (viscous + pressure).""" dim = self.mesh.dim DFDt = self.Unknowns.DFDt if DFDt is not None: # We can flag to only do this if the constitutive model has been updated DFDt.psi_fn = self._constitutive_model.flux.T F1 = expression( r"\mathbf{F}_1\left( \mathbf{u} \right)", DFDt.adams_moulton_flux() - sympy.eye(self.mesh.dim) * (self.p.sym[0]) + self.penalty * self.div_u * sympy.eye(dim), "NStokes pointwise flux term: F_1(u)", ) # Is the else condition useful - other than to prevent a crash ? # Yes, because then it can just live on the Stokes solver ... else: F1 = expression( r"\mathbf{F}_1\left( \mathbf{u} \right)", self._constitutive_model.flux.T - sympy.eye(self.mesh.dim) * (self.p.sym[0]), "NStokes pressure gradient term: F_1(u) - No Flux history provided", ) self._u_f1 = F1 return F1 @property def PF0(self): """Pointwise constraint term (continuity equation).""" dim = self.mesh.dim f0 = expression( r"\mathbf{F}_1\left( \mathbf{p} \right)", sympy.Matrix((self.constraints)), "NStokes pointwise flux term: f_0(p)", ) self._p_f0 = f0 return f0 ## Deprecate this function
[docs] def navier_stokes_problem_description(self): """Build residual terms for Navier-Stokes FEM assembly (deprecated).""" # f0 residual term self._u_f0 = self.F0.sym # f1 residual term self._u_f1 = self.F1.sym # p1 residual term self._p_f0 = self.PF0.sym return
@property def delta_t(self): """Timestep for time integration.""" return self._delta_t @delta_t.setter def delta_t(self, value): """Set the timestep value.""" self._needs_function_rewire = True self._delta_t.sym = value @property def rho(self): """Fluid density.""" return self._rho @rho.setter def rho(self, value): """Set the fluid density.""" self._needs_function_rewire = True self._rho.sym = value @property def f(self): """Source term for the momentum equation.""" return self._f @f.setter def f(self, value): """Set the volumetric source term.""" self._needs_function_rewire = True self._f = sympy.Matrix((value,)) @property def div_u(self): r"""Velocity divergence: :math:`\nabla \cdot \mathbf{u} = \mathrm{tr}(\dot{\varepsilon})`.""" E = self.strainrate divergence = E.trace() return divergence @property def strainrate(self): r"""Symmetric strain rate tensor (with 1/2 factor). .. math:: \dot{\varepsilon}_{ij} = \frac{1}{2}\left(\frac{\partial u_i}{\partial x_j} + \frac{\partial u_j}{\partial x_i}\right) """ return sympy.Matrix(self.Unknowns.E) @property def DuDt(self): """Time derivative operator for velocity.""" return self.Unknowns.DuDt @DuDt.setter def DuDt( self, DuDt_value: Union[SemiLagrangian_DDt, Lagrangian_DDt], ): """Set the time derivative operator for velocity.""" self.Unknowns.DuDt = DuDt_value self._solver_is_setup = False @property def DFDt(self): """Time derivative operator for stress flux.""" return self.Unknowns.DFDt @property def constraints(self): """Constraint equation (typically incompressibility).""" return self._constraints @constraints.setter def constraints(self, constraints_matrix): """Set the constraint equation.""" self._is_setup = False symval = sympify(constraints_matrix) self._constraints = symval @property def bodyforce(self): """Body force vector (e.g., gravity).""" return self._bodyforce @bodyforce.setter def bodyforce(self, value): """Set the body force vector.""" self._needs_function_rewire = True self._bodyforce = self.mesh.vector.to_matrix(value) @property def saddle_preconditioner(self): """Preconditioner for the Schur complement.""" return self._saddle_preconditioner @saddle_preconditioner.setter def saddle_preconditioner(self, value): """Set the Schur complement preconditioner.""" self._needs_function_rewire = True symval = sympify(value) self._saddle_preconditioner = symval @property def penalty(self): """Augmented Lagrangian penalty parameter.""" return self._penalty @penalty.setter def penalty(self, value): """Set the augmented Lagrangian penalty parameter.""" self._needs_function_rewire = True self._penalty.sym = value
[docs] @timing.routine_timer_decorator @memprobe.instrument("NavierStokes.solve") def solve( self, zero_init_guess: bool = True, timestep: float = None, _force_setup: bool = False, verbose=False, _evalf=False, order=None, divergence_retries: int = 0, ): """ Generates solution to constructed system. Params ------ zero_init_guess: If `True`, a zero initial guess will be used for the system solution. Otherwise, the current values of `self.u` will be used. divergence_retries: If SNES reports DIVERGED, retry with warm start up to this many times. 0 preserves legacy behaviour. """ if order is None or order > self._order: order = self._order if timestep is not None and timestep != self.delta_t: self.delta_t = timestep # this will force an initialisation because the functions need to be updated if _force_setup: self._needs_function_rewire = True if not self.constitutive_model._solver_is_setup: self._needs_function_rewire = True self.DFDt.psi_fn = self.constitutive_model.flux.T if not self.is_setup: self._setup_pointwise_functions(verbose) self._setup_discretisation(verbose) self._setup_solver(verbose) if uw.mpi.rank == 0 and verbose: print(f"NS solver - pre-solve DuDt update", flush=True) # Update SemiLagrange Flux terms self.DuDt.update_pre_solve(timestep, verbose=verbose, evalf=_evalf) self.DFDt.update_pre_solve(timestep, verbose=verbose, evalf=_evalf) # Override AM coefficients if flux_order is explicitly set if self._flux_order is not None: from underworld3.systems.ddt import _update_am_values fo = min(self._flux_order, self.DFDt.effective_order) _update_am_values(self.DFDt._am_coeffs, fo, 0.5) if uw.mpi.rank == 0 and verbose: print(f"NS solver - solve Stokes flow", flush=True) super().solve( zero_init_guess, _force_setup=_force_setup, verbose=verbose, picard=0, divergence_retries=divergence_retries, ) if uw.mpi.rank == 0 and verbose: print(f"NS solver - post-solve DuDt update", flush=True) self.DuDt.update_post_solve(timestep, verbose=verbose, evalf=_evalf) self.DFDt.update_post_solve(timestep, verbose=verbose, evalf=_evalf) self.is_setup = True self.constitutive_model._solver_is_setup = True return
[docs] @timing.routine_timer_decorator def estimate_dt(self): r""" Estimate an appropriate timestep for the Navier-Stokes solver. This is an implicit solver, so the returned :math:`\delta t` should be interpreted as: - :math:`\delta t_{\textrm{diff}}`: typical time for vorticity diffusion across an element - :math:`\delta t_{\textrm{adv}}`: typical element-crossing time for a fluid parcel The Navier-Stokes equations include momentum diffusion via kinematic viscosity :math:`\nu = \eta/\rho`, so the diffusive timestep is computed from this quantity. Returns ------- tuple (:math:`\delta t_{\textrm{diff}}`, :math:`\delta t_{\textrm{adv}}`) """ ### required modules from mpi4py import MPI # For Navier-Stokes, diffusivity is the kinematic viscosity: ν = η/ρ # Use the unified .K property from the constitutive model (returns viscosity) K = self.constitutive_model.K # Evaluate the viscosity (handles constant and spatially-varying cases) if isinstance(K, sympy.Expr) or hasattr(K, 'sym'): K_sym = K.sym if hasattr(K, 'sym') else K if uw.function.fn_is_constant_expr(K_sym): diffusivity = uw.function.evaluate( K_sym, np.zeros((1, self.mesh.dim)), ) else: diffusivity = uw.function.evaluate( sympy.sympify(K_sym), self.mesh._centroids, self.mesh.N, ) diffusivity = diffusivity.max() else: diffusivity = K # If diffusivity is unit-aware (UnitAwareArray), nondimensionalise it to get # consistent nondimensional values that match mesh._radii # Note: .magnitude returns physical units, which would be wrong here if hasattr(diffusivity, "units") and diffusivity.units is not None: diffusivity = uw.non_dimensionalise(diffusivity) elif hasattr(diffusivity, "magnitude"): # Plain UWQuantity without units context - use magnitude diffusivity = diffusivity.magnitude # Ensure diffusivity is a plain numpy scalar max_diffusivity = float(np.asarray(diffusivity).max()) ## get global max diffusivity value comm = uw.mpi.comm diffusivity_glob = comm.allreduce(max_diffusivity, op=MPI.MAX) ### get the velocity values vel = uw.function.evaluate( self.u.sym, self.mesh._centroids, self.mesh.N, ) # If vel is unit-aware (UnitAwareArray), nondimensionalise it to get # consistent nondimensional values that match mesh._radii # Note: .magnitude returns physical units, which would be wrong here if hasattr(vel, "units") and vel.units is not None: vel = uw.non_dimensionalise(vel) elif hasattr(vel, "magnitude"): # Plain UWQuantity without units context - use magnitude vel = vel.magnitude # Ensure vel is a plain numpy array vel = np.asarray(vel) ### get global velocity from velocity field max_magvel = np.linalg.norm(vel, axis=1).max() max_magvel_glob = comm.allreduce(max_magvel, op=MPI.MAX) ## get radius min_dx = self.mesh.get_min_radius() ## estimate dt of adv and diff components dt_diff = np.inf dt_adv = np.inf if diffusivity_glob != 0.0: dt_diff = (min_dx**2) / diffusivity_glob if max_magvel_glob != 0.0: dt_adv = min_dx / max_magvel_glob return ( _apply_unit_aware_scaling(np.squeeze(dt_diff), self.u, self.mesh), _apply_unit_aware_scaling(np.squeeze(dt_adv), self.u, self.mesh), )