r"""
PDE solvers for Underworld3.
This module provides finite element solvers for common PDEs in geodynamics
and computational mechanics. All solvers use PETSc's SNES (Scalable Nonlinear
Equation Solvers) framework.
Scalar Equations
----------------
SNES_Poisson
Poisson/diffusion equation: :math:`\nabla \cdot (k \nabla T) = f`
SNES_Darcy
Darcy flow: pressure-driven flow through porous media
SNES_TransientDarcy
Transient groundwater flow with constant storage
SNES_Richards
Richards equation for variably-saturated porous media flow
Vector Equations
----------------
SNES_Stokes
Stokes flow: :math:`\nabla \cdot \boldsymbol{\tau} - \nabla p = f`
SNES_VE_Stokes
Viscoelastic Stokes with stress history
SNES_Stokes_SaddlePt
Stokes with saddle-point preconditioner
Transport Equations
-------------------
SNES_AdvectionDiffusion
Scalar advection-diffusion with Lagrangian tracking
Notes
-----
Solvers support unit-aware calculations when reference quantities are
set on the model. Physical units are preserved through the solve process
and timestep estimates are returned with appropriate time units.
See Also
--------
underworld3.systems.ddt : Time derivative schemes for transient problems.
underworld3.constitutive_models : Material constitutive laws.
Examples
--------
Set up a Stokes solver:
>>> stokes = uw.systems.Stokes(mesh, velocityField=v, pressureField=p)
>>> stokes.constitutive_model = uw.constitutive_models.ViscousFlowModel
>>> stokes.viscosity = 1e21
>>> stokes.solve()
"""
import sympy
from sympy import sympify
import numpy as np
from typing import Optional, Callable, Union
import underworld3 as uw
from underworld3.systems import SNES_Scalar, SNES_Vector, SNES_Stokes_SaddlePt
from underworld3.cython.generic_solvers import SNES_MultiComponent
from underworld3 import VarType
import underworld3.timing as timing
from underworld3.utilities import memprobe
from underworld3.utilities._api_tools import (
uw_object,
SymbolicProperty,
Parameter,
Template,
ExpressionProperty,
)
from underworld3.function import expression as public_expression
expression = lambda *x, **X: public_expression(*x, _unique_name_generation=True, **X)
def _apply_unit_aware_scaling(dt_nondimensional, field, mesh):
"""
Helper function to apply unit-aware scaling to timestep estimates.
Detects the units of the velocity field and applies appropriate time scaling
to convert nondimensional timestep to physical time units.
Parameters
----------
dt_nondimensional : float or np.ndarray
The nondimensional timestep estimate
field : MeshVariable or SymPy expression (often a Matrix)
The velocity field - units are detected from this
mesh : Mesh
The mesh (may have reference to model with time scales)
Returns
-------
float or UWQuantity
Timestep with physical time units if detectable, otherwise nondimensional
"""
try:
from ..function.quantities import UWQuantity
from ..units import get_units
import sympy
# Extract a component from field if it's a Matrix (common for velocity)
field_to_check = field
if isinstance(field, sympy.MatrixBase):
# Extract first component: V_fn[0] or V_fn[0,0]
if field.shape[0] > 0:
field_to_check = field[0] if len(field.shape) == 1 else field[0, 0]
# Try to get units from the field expression
field_units = get_units(field_to_check)
if field_units is not None:
# Field has units - verify it has time dimension (as expected for velocity)
# Get dimensionality: e.g., {'[length]': 1, '[time]': -1} for velocity
field_dimensionality = field_units.dimensionality
# Check if this has time dimension (velocity should have time^-1)
if '[time]' in field_dimensionality:
# Velocity field has time dimension - use model time scale for result
# Don't try to match the velocity's specific time units (fragile string parsing)
# Instead, always return in model's fundamental time scale and let user convert
model = uw.get_default_model()
if model and hasattr(model, "fundamental_scales"):
model_time_scale = model.fundamental_scales.get("time")
if model_time_scale is not None:
# Apply scaling: dt_physical = dt_nd * time_scale
dt_physical = dt_nondimensional * model_time_scale
# Return as UWQuantity
if not isinstance(dt_physical, UWQuantity):
dt_physical = UWQuantity._from_pint(dt_physical)
return dt_physical
# Fallback: check if model has time scales (old behavior)
model = uw.get_default_model()
if model and hasattr(model, "fundamental_scales") and model.fundamental_scales:
time_scale = model.fundamental_scales.get("time")
if time_scale is not None:
dt_physical = dt_nondimensional * time_scale
if not isinstance(dt_physical, UWQuantity):
dt_physical = UWQuantity._from_pint(dt_physical)
return dt_physical
except Exception as e:
# Silently fall back to nondimensional
pass
return dt_nondimensional
from .ddt import SemiLagrangian as SemiLagrangian_DDt
from .ddt import Lagrangian as Lagrangian_DDt
from .ddt import Eulerian as Eulerian_DDt
from .ddt import Symbolic as Symbolic_DDt
[docs]
class SNES_Poisson(SNES_Scalar):
r"""Poisson equation solver.
Provides a discrete representation of the Poisson equation:
.. math::
\nabla \cdot \left[ \boldsymbol{\kappa} \nabla u \right] = f
where :math:`\mathbf{F} = \boldsymbol{\kappa} \nabla u` relates the flux to
gradients in the unknown :math:`u`.
Parameters
----------
mesh : Mesh
The computational mesh.
u_Field : MeshVariable, optional
Pre-existing mesh variable for the solution. If None, one is created.
verbose : bool, optional
Enable verbose output during solve.
degree : int, optional
Polynomial degree for the solution field (default: 2).
DuDt : SemiLagrangian_DDt or Lagrangian_DDt, optional
Time derivative operator for time-dependent problems.
DFDt : SemiLagrangian_DDt or Lagrangian_DDt, optional
Time derivative operator for the flux.
"""
[docs]
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
u_Field: uw.discretisation.MeshVariable = None,
verbose=False,
degree=2,
DuDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None,
DFDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None,
):
## Keep track
## Parent class will set up default values etc
super().__init__(
mesh,
u_Field,
degree,
verbose,
DuDt=DuDt,
DFDt=DFDt,
)
# default values for properties
self.f = sympy.Matrix.zeros(1, 1)
self._constitutive_model = None
# =========================================================================
# PETSc Residual Templates
# For the Poisson equation: -∇·(k∇u) = f
# =========================================================================
F0 = Template(
r"f_0 \left( \mathbf{u} \right)",
lambda self: -self.f,
r"""Source term for the Poisson equation (pointwise).
The $f_0$ term represents the source/sink in the diffusion equation.
Set via the ``f`` property (e.g., ``poisson.f = heat_source``).
""",
)
F1 = Template(
r"\mathbf{F}_1\left( \mathbf{u} \right)",
lambda self: self.constitutive_model.flux.T,
r"""Diffusive flux term for the Poisson equation (pointwise).
The $\mathbf{F}_1$ vector represents the flux $k \nabla u$
from the constitutive model, where $k$ is the diffusivity.
""",
)
[docs]
@timing.routine_timer_decorator
def poisson_problem_description(self):
"""Build residual terms for Poisson FEM assembly."""
# f1 residual term (weighted integration) - scalar function
self._f0 = self.F0.sym
# f1 residual term (integration by parts / gradients)
# isotropic
self._f1 = self.F1.sym
return
@property
def f(self):
r"""Source term for the Poisson equation.
The source term :math:`f` appears on the right-hand side:
.. math::
\nabla \cdot (\kappa \nabla u) = f
Returns
-------
sympy.Matrix
Source term expression (scalar, shape ``(1, 1)``).
See Also
--------
constitutive_model : Provides the diffusivity :math:`\kappa`.
"""
return self._f
@f.setter
def f(self, value):
"""Set the source term (handles units and scaling)."""
self._needs_function_rewire = True
# Handle UWQuantity with units - enforce "units everywhere" principle
if hasattr(value, "value") and hasattr(value, "units"):
# Extract the plain value
plain_value = float(value.value)
# If ND scaling is active, scale the constant
import underworld3 as uw
if uw.is_nondimensional_scaling_active():
# The source term should have same dimensionality as the unknown field
# Access via self.Unknowns.u (Poisson) or self.Unknowns.DuDt.u (Stokes)
u = None
if hasattr(self, "Unknowns") and hasattr(self.Unknowns, "u"):
u = self.Unknowns.u
if u is not None and hasattr(u, "scaling_coefficient"):
scale = u.scaling_coefficient
if scale != 1.0 and scale != 0.0:
plain_value = plain_value / scale
self._f = sympy.Matrix((plain_value,))
# Also accept plain SymPy expressions (for variables, etc.)
elif hasattr(value, "atoms"): # It's a SymPy expression
self._f = sympy.Matrix((value,))
# Handle plain numbers - apply "units everywhere or nowhere" principle
# Exception: allow zero always (dimensionless/no source)
elif isinstance(value, (int, float)):
if value == 0.0 or value == 0:
# Zero is allowed - no source term
self._f = sympy.Matrix((0.0,))
else:
# Check if model has reference quantities defined
# If yes: enforce units everywhere
# If no: allow plain numbers (user is responsible for consistency)
import underworld3 as uw
model = uw.get_default_model()
if model.has_units():
# Reference quantities defined - enforce units everywhere
raise ValueError(
f"Units requirement enforced: Model has reference quantities defined.\n"
f"Source term 'f' requires units. Use:\n"
f" solver.f = uw.quantity({value}, 'appropriate_units')\n"
f"Example: poisson.f = uw.quantity({value}, 'kelvin')"
)
else:
# No reference quantities - allow plain numbers
# User is responsible for dimensional consistency
self._f = sympy.Matrix((float(value),))
else:
# Accept other types (might be symbolic)
self._f = sympy.Matrix((value,))
@property
def CM_is_setup(self):
"""Whether the constitutive model is configured for this solver."""
return self._constitutive_model._solver_is_setup
[docs]
class SNES_Darcy(SNES_Scalar):
r"""
Darcy flow equation solver for groundwater problems.
Provides a discrete representation of the groundwater flow equations:
.. math::
\underbrace{S_s \frac{\partial h}{\partial t}}_{\dot{u}}
- \nabla \cdot \underbrace{\left[ \boldsymbol{\kappa} \nabla h
- \boldsymbol{s} \right]}_{\mathbf{F}}
= \underbrace{W}_{h}
The flux term :math:`\mathbf{F}` relates the effective velocity to
pressure gradients:
.. math::
\boldsymbol{v} = \boldsymbol{\kappa} \nabla h - \boldsymbol{s}
Parameters
----------
mesh : Mesh
The computational mesh.
h_Field : MeshVariable, optional
Mesh variable for hydraulic head. Created automatically if not provided.
v_Field : MeshVariable, optional
Mesh variable for Darcy velocity. Created automatically if not provided.
degree : int, default=2
Polynomial degree for the finite element discretization.
verbose : bool, default=False
Enable verbose output.
DuDt : optional
Time derivative operator for the unknown.
DFDt : optional
Time derivative operator for the flux.
Notes
-----
- The unknown is :math:`h`, the hydraulic head
- The permeability tensor :math:`\kappa` is set via the ``constitutive_model``
property using one of the ``uw.constitutive_models`` classes
- :math:`W` is a pressure source term
- :math:`S_s` is the specific storage coefficient
- The time-dependent term :math:`\dot{f}` is not implemented in this version
- The solver returns both the primary field and the Darcy flux (mean-flow velocity)
See Also
--------
SNES_Poisson : Related diffusion-only solver.
uw.constitutive_models.DarcyFlowModel : Constitutive model for Darcy flow.
"""
[docs]
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
h_Field: Optional[uw.discretisation.MeshVariable] = None,
v_Field: Optional[uw.discretisation.MeshVariable] = None,
degree: int = 2,
verbose=False,
DuDt=None,
DFDt=None,
):
## Parent class will set up default values etc
super().__init__(
mesh,
h_Field,
degree,
verbose,
DuDt=DuDt,
DFDt=DFDt,
)
# default values for properties
self._f = sympy.Matrix([0])
self._k = 1
self._constitutive_model = None
## Set up the projection operator that
## solves the flow rate
self._v_projector = uw.systems.solvers.SNES_Vector_Projection(
self.mesh,
v_Field,
degree=self.Unknowns.u.degree - 1,
)
self._v = self._v_projector.Unknowns.u
# If we add smoothing, it should be small
self._v_projector.smoothing = 1.0e-6
# =========================================================================
# PETSc Residual Templates
# For Darcy flow: -∇·(K∇p) = f, with velocity v = -K∇p
# =========================================================================
F0 = Template(
r"f_0 \left( \mathbf{u} \right)",
lambda self: -self.f,
r"""Source term for the Darcy pressure equation (pointwise).
The $f_0$ term represents fluid sources/sinks.
Set via the ``f`` property.
""",
)
F1 = Template(
r"\mathbf{F}_1\left( \mathbf{u} \right)",
lambda self: self.darcy_flux,
r"""Darcy flux term (pointwise).
The $\mathbf{F}_1$ vector is the Darcy flux $K \nabla p$,
where $K$ is the permeability.
""",
)
[docs]
@timing.routine_timer_decorator
def darcy_problem_description(self):
"""Build residual terms for Darcy flow FEM assembly."""
# f1 residual term (weighted integration)
self._f0 = self.F0.sym
# f1 residual term (integration by parts / gradients)
self._f1 = self.F1.sym
# Flow calculation
self._v_projector.uw_function = -self.F1.sym
return
@property
def f(self):
"""Source term for the Darcy equation."""
return self._f
@f.setter
def f(self, value):
"""Set the source term."""
self._needs_function_rewire = True
self._f = sympy.Matrix((value,))
@property
def darcy_flux(self):
"""Darcy flux velocity computed from pressure gradient."""
flux = self.constitutive_model.flux.T
return flux
@property
def v(self):
"""Projected Darcy velocity field."""
return self._v
@v.setter
def v(self, value):
"""Set the velocity projection target."""
self._v_projector.is_setup = False
self._v = sympify(value)
[docs]
@timing.routine_timer_decorator
def solve(
self,
zero_init_guess: bool = True,
timestep: float = None,
verbose: bool = False,
_force_setup: bool = False,
divergence_retries: int = 0,
):
r"""Solve the Darcy flow system.
Computes the pressure field and Darcy flux velocity.
Parameters
----------
zero_init_guess : bool, optional
If True (default), start from zero initial guess.
If False, use current field values as initial guess.
timestep : float, optional
Timestep value for inertial terms (if applicable).
verbose : bool, optional
If True, print solver progress information.
_force_setup : bool, optional
Force re-setup of solver even if already configured.
divergence_retries : int, optional
If SNES reports DIVERGED, retry with warm start up to this
many times. 0 preserves legacy behaviour.
Notes
-----
After solving, the pressure field ``self.u`` and velocity field
``self.v`` contain the solution.
"""
if (not self.is_setup) or _force_setup:
self._setup_pointwise_functions(verbose)
self._setup_discretisation(verbose)
self._setup_solver(verbose)
# Solve pressure
super().solve(zero_init_guess, _force_setup,
divergence_retries=divergence_retries)
# Now solve flow field: v = -flux = -K(grad(h) - s)
# self._v_projector.petsc_options["snes_rtol"] = 1.0e-6
# self._v_projector.petsc_options.delValue("ksp_monitor")
self._v_projector.uw_function = -self.darcy_flux
self._v_projector.solve(zero_init_guess)
return
# @timing.routine_timer_decorator
# def _setup_terms(self):
# self._v_projector.uw_function = self.darcy_flux
# self._v_projector._setup_terms() # _setup_pointwise_functions() #
# super()._setup_terms()
class SNES_TransientDarcy(SNES_Darcy):
r"""
Transient Darcy flow solver for time-dependent groundwater problems.
Solves the transient groundwater flow equation:
.. math::
S_s \frac{\partial h}{\partial t}
- \nabla \cdot \left[ K (\nabla h - \mathbf{s}) \right] = f
where :math:`S_s` is the specific storage (constant), :math:`K` is the
hydraulic conductivity, :math:`\mathbf{s}` is the body force (gravity),
and :math:`f` is a source/sink term.
Inherits velocity projection from :class:`SNES_Darcy`.
Parameters
----------
mesh : Mesh
The computational mesh.
h_Field : MeshVariable, optional
Mesh variable for hydraulic head.
v_Field : MeshVariable, optional
Mesh variable for Darcy velocity.
order : int, default=1
Time integration order (BDF/Adams-Moulton history depth).
theta : float, default=0.5
Implicit time weighting (0=explicit, 0.5=Crank-Nicolson, 1=implicit).
degree : int, default=2
Polynomial degree for finite element discretization.
verbose : bool, default=False
Enable verbose output.
DuDt : optional
Time derivative operator for the unknown.
DFDt : optional
Time derivative operator for the flux.
Attributes
----------
storage : sympy.Expr
Specific storage :math:`S_s` (default 1).
delta_t : UWexpression
Current timestep.
See Also
--------
SNES_Darcy : Steady-state parent solver.
SNES_Richards : Nonlinear extension for unsaturated flow.
"""
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
h_Field: Optional[uw.discretisation.MeshVariable] = None,
v_Field: Optional[uw.discretisation.MeshVariable] = None,
order: int = 1,
theta: float = 0.5,
degree: int = 2,
verbose=False,
DuDt=None,
DFDt=None,
):
super().__init__(mesh, h_Field, v_Field, degree, verbose, DuDt=DuDt, DFDt=DFDt)
self.theta = theta
self._delta_t = expression(R"\Delta t", 0, "Physically motivated timestep")
self._storage = sympy.sympify(1)
self._needs_function_rewire = True
if DuDt is None:
self.Unknowns.DuDt = Eulerian_DDt(
self.mesh,
h_Field,
vtype=uw.VarType.SCALAR,
degree=h_Field.degree,
continuous=h_Field.continuous,
varsymbol=h_Field.symbol,
verbose=verbose,
order=order,
smoothing=0.0,
)
else:
if order is not None and DuDt.order < order:
raise RuntimeError(
f"DuDt supplied is order {DuDt.order} but order requested is {order}"
)
self.Unknowns.DuDt = DuDt
if DFDt is None:
self.Unknowns.DFDt = Symbolic_DDt(
sympy.Matrix([[0] * self.mesh.dim]),
varsymbol=rf"{{F[ {self.u.symbol} ] }}",
theta=theta,
order=order,
)
else:
self.Unknowns.DFDt = DFDt
# --- Properties ---
@property
def storage(self):
"""Specific storage coefficient :math:`S_s`."""
return self._storage
@storage.setter
def storage(self, value):
self._needs_function_rewire = True
self._storage = sympy.sympify(value)
@property
def delta_t(self):
"""Current timestep."""
return self._delta_t
@delta_t.setter
def delta_t(self, value):
self._needs_function_rewire = True
if hasattr(value, "value"):
self._delta_t.sym = value.value
elif hasattr(value, "magnitude"):
self._delta_t.sym = value.magnitude
else:
self._delta_t.sym = value
@property
def DuDt(self):
return self.Unknowns.DuDt
@property
def DFDt(self):
return self.Unknowns.DFDt
# --- Template expressions (override Darcy's steady-state Templates) ---
@property
def F0(self):
"""Pointwise storage + source: :math:`S_s \\dot{h} / \\Delta t - f`."""
f0 = expression(
r"f_0(h)",
-self.f + self.storage * self.DuDt.bdf() / self.delta_t,
"Transient Darcy storage + source term",
)
self._f0 = f0
return f0
@property
def F1(self):
"""Pointwise flux: Adams-Moulton time-weighted Darcy flux."""
F1_val = expression(
r"\mathbf{F}_1(h)",
self.DFDt.adams_moulton_flux(),
"Transient Darcy flux (time-weighted)",
)
self._f1 = F1_val
return F1_val
# --- Timestep estimation ---
@timing.routine_timer_decorator
def estimate_dt(self):
r"""
Estimate a stable timestep based on diffusive CFL.
Returns
-------
float or pint.Quantity
Diffusive timestep :math:`\delta t = (\Delta x)^2 / K_{\max}`.
"""
from mpi4py import MPI
K = self.constitutive_model.K
if isinstance(K, sympy.Expr) or hasattr(K, "sym"):
K_sym = K.sym if hasattr(K, "sym") else K
if uw.function.fn_is_constant_expr(K_sym):
diffusivity = uw.function.evaluate(
K_sym, np.zeros((1, self.mesh.dim))
)
else:
diffusivity = uw.function.evaluate(
sympy.sympify(K_sym), self.mesh._centroids, self.mesh.N
)
diffusivity = diffusivity.max()
else:
diffusivity = K
if hasattr(diffusivity, "units") and diffusivity.units is not None:
diffusivity = uw.non_dimensionalise(diffusivity)
elif hasattr(diffusivity, "magnitude"):
diffusivity = diffusivity.magnitude
diffusivity = float(np.asarray(diffusivity).max())
comm = uw.mpi.comm
diffusivity_glob = comm.allreduce(diffusivity, op=MPI.MAX)
min_dx = self.mesh.get_min_radius()
if diffusivity_glob != 0.0:
dt_diff = (min_dx**2) / diffusivity_glob
else:
dt_diff = np.inf
return _apply_unit_aware_scaling(np.squeeze(dt_diff), self.u, self.mesh)
# --- Solve ---
@timing.routine_timer_decorator
def solve(
self,
zero_init_guess: bool = True,
timestep=None,
_force_setup: bool = False,
verbose=False,
divergence_retries: int = 0,
):
r"""
Solve the transient Darcy system for one timestep.
Parameters
----------
zero_init_guess : bool, optional
Start from zero initial guess (default True).
timestep : float, optional
Timestep size. Updates ``self.delta_t`` if provided.
_force_setup : bool, optional
Force re-setup of solver.
verbose : bool, optional
Print solver progress.
divergence_retries : int, optional
If SNES reports DIVERGED, retry with warm start up to this
many times. 0 preserves legacy behaviour.
"""
if timestep is not None and timestep != self.delta_t:
self.delta_t = timestep
if not self.constitutive_model._solver_is_setup:
self._needs_function_rewire = True
self.DFDt.psi_fn = self.constitutive_model.flux.T
if not self.is_setup:
self._setup_pointwise_functions(verbose)
self._setup_discretisation(verbose)
self._setup_solver(verbose)
# Pre-solve: update history terms
self.DuDt.update_pre_solve(timestep, verbose=verbose)
self.DFDt.update_pre_solve(timestep, verbose=verbose)
# Solve PDE (bypass SNES_Darcy.solve to avoid double setup/projection)
SNES_Scalar.solve(self, zero_init_guess, _force_setup,
divergence_retries=divergence_retries)
# Invalidate cached data views
target_var = getattr(self.u, "_base_var", self.u)
if hasattr(target_var, "_canonical_data"):
target_var._canonical_data = None
# Post-solve: shift history
self.DuDt.update_post_solve(timestep, verbose=verbose)
self.DFDt.update_post_solve(timestep, verbose=verbose)
# Velocity projection (inherited from Darcy)
self._v_projector.uw_function = self.darcy_flux
self._v_projector.solve(zero_init_guess)
self.is_setup = True
self.constitutive_model._solver_is_setup = True
return
class SNES_Richards(SNES_TransientDarcy):
r"""
Richards equation solver for variably-saturated porous media flow.
Two formulations are supported:
**Mixed form** (mass-conservative, preferred) — set ``water_content``:
.. math::
\frac{\partial \theta}{\partial t}
- \nabla \cdot \left[ K(\psi) (\nabla \psi - \mathbf{s}) \right] = f
discretised as :math:`(\theta(\psi^{n+1}) - \theta(\psi^n)) / \Delta t`,
which is exactly conservative by construction (Celia et al., 1990).
**Head-based form** (backward compatible) — set ``capacity``:
.. math::
C(\psi) \frac{\partial \psi}{\partial t}
- \nabla \cdot \left[ K(\psi) (\nabla \psi - \mathbf{s}) \right] = f
where :math:`C(\psi) = d\theta/d\psi` is the specific moisture capacity.
Parameters
----------
mesh : Mesh
The computational mesh.
psi_Field : MeshVariable, optional
Mesh variable for pressure head :math:`\psi`.
v_Field : MeshVariable, optional
Mesh variable for Darcy velocity.
order : int, default=1
Time integration order.
theta : float, default=0.5
Implicit time weighting.
degree : int, default=2
Polynomial degree.
verbose : bool, default=False
Enable verbose output.
DuDt : optional
Time derivative operator for the unknown.
DFDt : optional
Time derivative operator for the flux.
Attributes
----------
water_content : sympy.Expr or None
Water content function :math:`\theta(\psi)` for the mixed form.
When set, the storage term uses
:math:`(\theta(\psi^{n+1}) - \theta(\psi^n)) / \Delta t`.
Takes precedence over ``capacity`` if both are set.
capacity : sympy.Expr
Specific moisture capacity :math:`C(\psi)` for the head-based form.
Used only when ``water_content`` is None.
psi : MeshVariable
Alias for ``self.u`` (pressure head).
See Also
--------
SNES_TransientDarcy : Linear parent solver.
underworld3.utilities.retention_curves : Van Genuchten and Gardner functions.
Examples
--------
Mixed form (preferred):
>>> from underworld3.utilities.retention_curves import (
... van_genuchten_theta, van_genuchten_K,
... )
>>> richards = uw.systems.Richards(mesh, psi_Field=psi, v_Field=v)
>>> richards.constitutive_model = uw.constitutive_models.DarcyFlowModel
>>> richards.constitutive_model.Parameters.permeability = van_genuchten_K(
... psi.sym[0], Ks=1e-4, alpha=3.35, n=2.0,
... )
>>> richards.water_content = van_genuchten_theta(
... psi.sym[0], theta_r=0.045, theta_s=0.43, alpha=3.35, n=2.0,
... )
Head-based form (backward compatible):
>>> from underworld3.utilities.retention_curves import van_genuchten_C
>>> richards.capacity = van_genuchten_C(
... psi.sym[0], theta_r=0.045, theta_s=0.43, alpha=3.35, n=2.0,
... )
"""
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
psi_Field: Optional[uw.discretisation.MeshVariable] = None,
v_Field: Optional[uw.discretisation.MeshVariable] = None,
order: int = 1,
theta: float = 0.5,
degree: int = 2,
verbose=False,
DuDt=None,
DFDt=None,
):
super().__init__(
mesh, psi_Field, v_Field, order, theta, degree, verbose, DuDt, DFDt
)
self._capacity = sympy.sympify(1)
self._water_content = None # None → head-based form; set → mixed form
@property
def water_content(self):
r"""Water content function :math:`\theta(\psi)` for the mixed form.
When set, the storage term uses
:math:`(\theta(\psi^{n+1}) - \theta(\psi^n)) / \Delta t`
instead of :math:`C(\psi) \cdot (\psi^{n+1} - \psi^n) / \Delta t`,
giving exact mass conservation (Celia et al., 1990).
The expression should be a SymPy function of ``psi.sym[0]``.
The Jacobian :math:`\partial\theta/\partial\psi = C(\psi)` is
computed automatically by PETSc (finite-difference colouring),
so there is no need to provide :math:`C(\psi)` separately.
"""
return self._water_content
@water_content.setter
def water_content(self, value):
self._needs_function_rewire = True
self._water_content = sympy.sympify(value) if value is not None else None
@property
def capacity(self):
r"""Specific moisture capacity :math:`C(\psi) = d\theta/d\psi`.
Used only when ``water_content`` is None (head-based form).
Typically a nonlinear SymPy expression depending on ``psi.sym[0]``.
"""
return self._capacity
@capacity.setter
def capacity(self, value):
self._needs_function_rewire = True
self._capacity = sympy.sympify(value)
@property
def psi(self):
"""Alias for ``self.u`` (pressure head)."""
return self.u
@property
def F0(self):
r"""Pointwise storage + source term.
Mixed form: :math:`(\theta(\psi^{n+1}) - \theta(\psi^n)) / \Delta t - f`
Head-based form: :math:`C(\psi) (\psi^{n+1} - \psi^n) / \Delta t - f`
"""
if self._water_content is not None:
# Mixed (mass-conservative) form:
# θ(ψ^{n+1}) is self._water_content (already in terms of psi.sym[0])
# θ(ψ^n) is water_content with psi.sym[0] → psi_star[0].sym[0]
psi_sym = self.psi.sym[0]
psi_star_sym = self.DuDt.psi_star[0].sym[0]
theta_new = self._water_content
theta_old = self._water_content.subs(psi_sym, psi_star_sym)
storage_term = sympy.Matrix([[theta_new - theta_old]]) / self.delta_t
else:
# Head-based form (backward compatible)
storage_term = self.capacity * self.DuDt.bdf() / self.delta_t
f0 = expression(
r"f_0(\psi)",
-self.f + storage_term,
"Richards storage + source term",
)
self._f0 = f0
return f0
## --------------------------------
## Stokes saddle point solver plus
## ancilliary functions - note that
## we need to update the description
## of the generic saddle pt solver
## to remove the Stokes-specific stuff
## --------------------------------
[docs]
class SNES_Stokes(SNES_Stokes_SaddlePt):
r"""
Stokes equation solver for incompressible viscous flow.
This class provides functionality for a discrete representation
of the Stokes flow equations assuming an incompressibility
(or near-incompressibility) constraint.
The momentum equation is:
.. math::
-\nabla \cdot \left[ \boldsymbol{\tau} - p \mathbf{I} \right] = \mathbf{f}
with the incompressibility constraint:
.. math::
\nabla \cdot \mathbf{u} = 0
The flux term is a deviatoric stress (:math:`\boldsymbol{\tau}`) related to velocity
gradients (:math:`\nabla \mathbf{u}`) through a viscosity tensor :math:`\eta`, and a
volumetric (pressure) part :math:`p`:
.. math::
\boldsymbol{\tau} = \frac{\eta}{2}\left( \nabla \mathbf{u} + \nabla \mathbf{u}^T \right)
The constraint equation gives incompressible flow by default but can be set
to any function of the unknown :math:`\mathbf{u}` and :math:`\nabla \cdot \mathbf{u}`.
Parameters
----------
mesh : uw.discretisation.Mesh
The computational mesh.
velocityField : uw.discretisation.MeshVariable, optional
Pre-existing velocity field. If None, one is created automatically.
pressureField : uw.discretisation.MeshVariable, optional
Pre-existing pressure field. If None, one is created automatically.
degree : int, optional
Polynomial degree for velocity interpolation. Default is 2.
p_continuous : bool, optional
If True (default), pressure is continuous. Set False for discontinuous pressure.
verbose : bool, optional
Enable verbose output during solving. Default is False.
DuDt : SemiLagrangian_DDt or Lagrangian_DDt, optional
Material derivative operator for velocity (used in derived classes).
DFDt : SemiLagrangian_DDt or Lagrangian_DDt, optional
Material derivative operator for flux (used in viscoelastic models).
Notes
-----
**Viscosity model**: The viscosity tensor :math:`\boldsymbol{\eta}` is provided by
setting the ``constitutive_model`` property to one of the ``uw.constitutive_models``
classes. It may be constant, spatially varying, non-linear, or anisotropic.
**Augmented Lagrangian**: Setting ``penalty`` to a non-zero value adds
:math:`\lambda \nabla \cdot \mathbf{u}` to the weak form, improving convergence
for incompressible flow (in addition to the constraint equation).
**Mixed finite elements**: The pressure field interpolation order determines
the integration order of the mixed method and is typically lower than the
velocity field order.
**Viscoelastic models**: For viscoelastic behaviour, the flux term contains
stress history tracked on a particle swarm. See :class:`SNES_VE_Stokes`.
See Also
--------
SNES_VE_Stokes : Viscoelastic Stokes solver with flux history.
SNES_NavierStokes : Navier-Stokes solver with inertial terms.
uw.constitutive_models : Available viscosity models.
Examples
--------
>>> import underworld3 as uw
>>> mesh = uw.meshing.UnstructuredSimplexBox(minCoords=(0,0), maxCoords=(1,1), cellSize=0.1)
>>> stokes = uw.systems.Stokes(mesh, degree=2)
>>> stokes.constitutive_model = uw.constitutive_models.ViscousFlowModel()
>>> stokes.constitutive_model.Parameters.viscosity = 1.0
>>> stokes.bodyforce = [0, -1] # gravity
>>> stokes.solve()
"""
instances = 0
[docs]
def __init__(
self,
mesh: uw.discretisation.Mesh,
velocityField: Optional[uw.discretisation.MeshVariable] = None,
pressureField: Optional[uw.discretisation.MeshVariable] = None,
degree: Optional[int] = 2,
p_continuous: Optional[bool] = True,
verbose: Optional[bool] = False,
# Not used in Stokes, but may be used in NS, VE etc
DuDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None,
DFDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None,
):
super().__init__(
mesh,
velocityField,
pressureField,
degree,
p_continuous,
verbose,
DuDt=DuDt,
DFDt=DFDt,
)
self._degree = degree
# User-facing operations are matrices / vectors by preference
self._Estar = None
self._penalty = expression(R"\uplambda", 0, "Numerical Penalty")
self._constraints = sympy.Matrix((self.div_u,)) # by default, incompressibility constraint
self._bodyforce = expression(
Rf"\mathbf{{f}}_0\left( {self.Unknowns.u.symbol} \right)",
sympy.Matrix([[0] * self.mesh.dim]),
"Stokes pointwise force term: f_0(u)",
)
# this attrib records if we need to setup the problem (again)
self._needs_function_rewire = True
self._constitutive_model = None
# Optional: alternative F1 expression to autodiff for the
# Jacobian. When None, autodiff F1 itself (default). Used for
# inexact-Newton tricks like a smooth-Jacobian / sharp-residual
# split at a VEP yield kink. See ``set_jacobian_F1_source``.
self._F1_jacobian_source = None
return
[docs]
def set_jacobian_F1_source(self, F1_source, linesearch="cp"):
r"""Override the F1 expression used to build the Jacobian blocks.
By default, the Stokes Jacobian's uu / up G2, G3 blocks are
autodiff'd from the residual F1. Some problems benefit from
differentiating a *different* but related expression — e.g. a
smooth (softmin) viscosity formula for the Jacobian while the
residual F1 keeps a sharp Min, so Newton sees a continuous
derivative even when the iterate sits exactly on the yield kink.
Setting ``F1_source`` triggers a JIT recompile (the Jacobian
symbols change). Pass ``None`` to revert to autodiff of F1.
Parameters
----------
F1_source : sympy.Matrix or None
Alternative expression of the same shape as ``F1.sym``.
linesearch : str or None, default ``"cp"``
SNES linesearch type to install when ``F1_source`` is set.
Defaults to ``"cp"`` (critical-point) because inexact-Newton
steps don't reliably reduce the residual norm and PETSc's
default ``bt`` (backtracking) consequently rejects useful
steps with ``DIVERGED_LINE_SEARCH``. ``cp`` accepts the
predicted step at the optimum of the local linearisation and
converges cleanly on the same problems where ``bt`` flails.
Set to ``None`` to leave the linesearch type untouched (e.g.
if you've already configured one via ``petsc_options``).
Has no effect when ``F1_source is None``.
"""
self._F1_jacobian_source = F1_source
self._needs_function_rewire = True
if F1_source is not None and linesearch is not None:
self.petsc_options["snes_linesearch_type"] = linesearch
def _create_stress_history_ddt(self, order=2):
"""Create DFDt for stress history tracking (VE/VEP models).
Called automatically when a constitutive model with
``requires_stress_history = True`` is assigned. Can also be called
explicitly to pre-create the DFDt with a specific order.
Constitutive models can inject extra SemiLagrangian kwargs via the
``stress_history_ddt_kwargs`` property — used e.g. by
``MaxwellExponentialFlowModel`` to set ``with_forcing_history=True``.
"""
if self.Unknowns.DFDt is not None:
return # already created
self._order = order
# Constitutive model may request extra SemiLagrangian kwargs (e.g.
# with_forcing_history for ETD-2 integration).
cm = getattr(self, "constitutive_model", None)
ddt_kwargs = {}
if cm is not None:
ddt_kwargs = dict(getattr(cm, "stress_history_ddt_kwargs", {}))
self.Unknowns.DFDt = uw.systems.ddt.SemiLagrangian(
self.mesh,
sympy.Matrix.zeros(self.mesh.dim, self.mesh.dim),
self.u.sym,
vtype=uw.VarType.SYM_TENSOR,
degree=self.u.degree - 1,
continuous=True,
varsymbol=rf"{{F[ {self.u.symbol} ] }}",
verbose=self.verbose,
bcs=None,
order=order,
smoothing=0.0001,
**ddt_kwargs,
)
# Stress flux = 2·viscosity·E_eff references psi_star[0] in E_eff's
# history term — without snapshot substitution the projection of
# flux→psi_star[0] becomes implicit in psi_star[0] and Min-mode at
# yield admits the wrong fixed point under timestep change.
self.Unknowns.DFDt.enable_source_snapshot()
[docs]
@timing.routine_timer_decorator
@memprobe.instrument("Stokes.solve")
def solve(
self,
zero_init_guess: bool = True,
timestep: float = None,
_force_setup: bool = False,
verbose: bool = False,
debug: bool = False,
debug_name: str = None,
evalf=False,
order=None,
picard: int = 0,
divergence_retries: int = 0,
):
"""Solve the Stokes system, with optional viscoelastic stress history.
When a constitutive model with stress history is active (DFDt is not None),
the solve includes pre/post hooks for advecting stress history, updating
BDF coefficients, and projecting the actual stress after the solve.
Parameters
----------
zero_init_guess : bool
If True, use zero initial guess. Otherwise use current field values.
timestep : float, optional
Advection timestep. Required when stress history is active.
_force_setup : bool
Force rebuild of pointwise functions.
verbose : bool
Enable verbose output.
evalf : bool
Force numerical evaluation during history updates.
order : int, optional
Override the VE time integration order.
picard : int, default=0
Number of Picard iterations before switching to Newton.
Picard uses a simplified Jacobian and can help convergence
for strongly nonlinear problems like VEP at yield onset.
divergence_retries : int, default=0
If SNES returns a DIVERGED reason after the main solve, re-call
the underlying Newton up to this many times with a warm start
(``zero_init_guess=False``) to try to rescue. Each retry uses
the just-computed iterate plus the freshly-advected stress
history, which is often enough for VEP at yield onset (Min/softmin
kinks) to step off a bad Newton iterate. ``0`` preserves legacy
behaviour (divergence is terminal). Typical useful value is 1.
Only applies in the VE/VEP branch (``DFDt is not None``).
"""
has_stress_history = self.Unknowns.DFDt is not None
if has_stress_history:
if timestep is None:
raise ValueError(
"timestep is required for viscoelastic solve. "
"Call stokes.solve(timestep=dt)"
)
# dt_elastic must always equal the solve timestep. The constitutive
# model's VE formulas (eta_eff, stress history terms) all reference
# Parameters.dt_elastic. If it differs from the actual timestep,
# the stress computation is inconsistent with the time integration.
self.constitutive_model.Parameters.dt_elastic = timestep
if order is None or order > self._order:
order = self._order
if _force_setup:
self._needs_function_rewire = True
# Re-setup when effective_order changes (DDt history ramp-up)
_current_eff_order = self.constitutive_model.effective_order
if not hasattr(self, '_prev_effective_order'):
self._prev_effective_order = None
if _current_eff_order != self._prev_effective_order:
self._needs_function_rewire = True
self.constitutive_model._solver_is_setup = False
self._prev_effective_order = _current_eff_order
if not self.constitutive_model._solver_is_setup:
self._needs_function_rewire = True
self.DFDt.psi_fn = self.constitutive_model.flux.T
if not self.is_setup:
self._setup_pointwise_functions(verbose)
self._setup_discretisation(verbose)
self._setup_solver(verbose)
# 1. ADVECT stress history along characteristics
if uw.mpi.rank == 0 and verbose:
print(f"Stokes solver - advect stress history", flush=True)
self.DFDt.update_pre_solve(timestep, verbose=verbose, evalf=evalf,
store_result=False)
# Uniform pre-solve coefficient hook: VEP delegates to
# _update_bdf_coefficients(); MaxwellExponentialFlowModel updates
# α, φ on the DDt via _update_exp_coefficients(). No isinstance
# checks at the solver layer.
self.constitutive_model._update_history_coefficients()
# 2. SOLVE
if uw.mpi.rank == 0 and verbose:
print(f"Stokes solver - solve", flush=True)
super().solve(
zero_init_guess,
_force_setup=_force_setup,
verbose=verbose,
picard=picard,
divergence_retries=divergence_retries,
)
# 3. PROJECT actual stress and SHIFT history
if uw.mpi.rank == 0 and verbose:
print(f"Stokes solver - store stress and shift history", flush=True)
import numpy as np
_advected_sigma_star = np.copy(self.DFDt.psi_star[0].array[...])
if getattr(self.DFDt, '_psi_star_use_multicomponent', False):
# Multi-component projection of flux → psi_star[0].
#
# The DFDt's source-snapshot machinery (enabled once in
# _create_stress_history_ddt) intercepts psi_fn assignment
# to substitute psi_star[0] symbols with a frozen
# psi_snapshot variable, refreshed each step in
# update_pre_solve. So the projection's compiled source
# reads from psi_snapshot (not psi_star[0] itself) and is a
# true one-shot Galerkin projection — no implicit
# fixed-point iteration.
self.DFDt._psi_star_projection_solver.smoothing = 0.0
self.DFDt._psi_star_projection_solver.solve(verbose=verbose)
# Fan flat result back to psi_star[0] tensor variable
for k, (i, j) in enumerate(self.DFDt._psi_star_indep_indices):
vals = self.DFDt._psi_star_flat_var.array[:, 0, k]
self.DFDt.psi_star[0].array[:, i, j] = vals
if i != j:
self.DFDt.psi_star[0].array[:, j, i] = vals
else:
self.DFDt._psi_star_projection_solver.uw_function = self.constitutive_model.flux
self.DFDt._psi_star_projection_solver.smoothing = 0.0
self.DFDt._psi_star_projection_solver.solve(verbose=verbose)
for i in range(self.DFDt.order - 1, 0, -1):
if i == 1:
self.DFDt.psi_star[i].array[...] = _advected_sigma_star
else:
self.DFDt.psi_star[i].array[...] = self.DFDt.psi_star[i - 1].array[...]
self.DFDt.update_post_solve(timestep, verbose=verbose, evalf=evalf)
# Uniform post-solve hook for any extra integrator-state storage.
# VEP: no-op. ETD-2 / MaxwellExponentialFlowModel: refresh
# forcing_star with current ε̇^{n+1} so the next step's history
# term has access to ε̇ⁿ.
self.constitutive_model._update_history_post_solve()
self.is_setup = True
self.constitutive_model._solver_is_setup = True
else:
# Plain Stokes — no stress history
super().solve(
zero_init_guess,
_force_setup=_force_setup,
verbose=verbose,
divergence_retries=divergence_retries,
)
@property
def tau(self):
r"""Deviatoric stress from the most recent solve.
When stress history is active (VEP), returns ``psi_star[0]`` which
contains the actual projected stress. Otherwise falls through to the
base class lazy projection.
"""
if self.Unknowns.DFDt is not None:
return self.DFDt.psi_star[0]
return super().tau
# =========================================================================
# PETSc Residual Templates
# These define the weak form terms assembled by PETSc's finite element system.
# F0/F1 are velocity equation terms, PF0 is the pressure/continuity equation.
# =========================================================================
F0 = Template(
r"\mathbf{f}_0\left( \mathbf{u} \right)",
lambda self: -self.bodyforce.sym,
r"""Velocity equation body force term (pointwise).
The $\mathbf{f}_0$ term represents external body forces acting on the
fluid, typically gravity. This is the negative of the ``bodyforce`` parameter.
""",
)
F1 = Template(
r"\mathbf{F}_1\left( \mathbf{u} \right)",
lambda self: (
self.stress + self.penalty * self.div_u * sympy.eye(self.mesh.dim)
),
r"""Velocity equation flux/stress term (pointwise).
The $\mathbf{F}_1$ tensor represents the stress response of the fluid,
combining deviatoric stress $\boldsymbol{\tau}$, pressure $p$,
and penalty term for weak incompressibility.
""",
)
PF0 = Template(
r"\mathbf{h}_0\left( \mathbf{p} \right)",
lambda self: sympy.Matrix((self.constraints)),
r"""Pressure equation constraint term (continuity).
The $h_0$ term enforces the incompressibility constraint
$\nabla \cdot \mathbf{u} = 0$. Additional constraints can be
added via ``add_condition()``.
""",
)
# deprecated
[docs]
@timing.routine_timer_decorator
def stokes_problem_description(self):
"""Build residual terms for Stokes FEM assembly (deprecated)."""
# f0 residual term
self._u_f0 = self.F0.sym
# f1 residual term
self._u_f1 = self.F1.sym
# p0 residual term
self._p_f0 = self.PF0.sym
return
@property
def CM_is_setup(self):
"""Whether the constitutive model is configured for this solver."""
return self._constitutive_model._solver_is_setup
@property
def strainrate(self):
r"""Symmetric strain rate tensor from velocity gradients.
The strain rate tensor :math:`\dot{\varepsilon}` is computed as:
.. math::
\dot{\varepsilon}_{ij} = \frac{1}{2}\left(\frac{\partial u_i}{\partial x_j}
+ \frac{\partial u_j}{\partial x_i}\right)
Returns
-------
sympy.Matrix
Symmetric tensor of shape ``(dim, dim)`` where ``dim`` is the
mesh dimensionality.
See Also
--------
strainrate_1d : Voigt notation (vector) form.
stress_deviator : Deviatoric stress computed from strain rate.
"""
return sympy.Matrix(self.mesh.vector.strain_tensor(self.Unknowns.u.sym))
@property
def strainrate_1d(self):
r"""Strain rate in Voigt notation (vector form).
Converts the symmetric strain rate tensor to Voigt notation for
use in constitutive model calculations. In 2D, returns a 3-component
vector; in 3D, a 6-component vector.
Returns
-------
sympy.Matrix
Strain rate in Voigt notation.
See Also
--------
strainrate : Full tensor form.
"""
return uw.maths.tensor.rank2_to_voigt(self.strainrate, self.mesh.dim)
@property
def strainrate_star_1d(self):
r"""Historical strain rate in Voigt notation (for viscoelastic models).
Used in viscoelastic formulations where the stress depends on
both current and historical strain rates.
Returns
-------
sympy.Matrix
Historical strain rate in Voigt notation.
See Also
--------
strainrate_1d : Current strain rate.
"""
return uw.maths.tensor.rank2_to_voigt(self.strainrate_star, self.mesh.dim)
@property
def stress_deviator(self):
r"""Deviatoric stress tensor from the constitutive model.
The deviatoric stress :math:`\boldsymbol{\tau}` is the traceless part
of the stress tensor, computed by the constitutive model from the
strain rate:
.. math::
\boldsymbol{\tau} = \boldsymbol{\eta} : \dot{\boldsymbol{\varepsilon}}
For a Newtonian fluid: :math:`\boldsymbol{\tau} = 2\eta\dot{\boldsymbol{\varepsilon}}`
Returns
-------
sympy.Matrix
Deviatoric stress tensor of shape ``(dim, dim)``.
See Also
--------
stress : Total stress (deviatoric + pressure).
constitutive_model : Provides the viscosity relationship.
"""
return self.constitutive_model.flux
@property
def stress_deviator_1d(self):
r"""Deviatoric stress in Voigt notation.
Returns
-------
sympy.Matrix
Deviatoric stress in Voigt notation.
See Also
--------
stress_deviator : Full tensor form.
"""
return uw.maths.tensor.rank2_to_voigt(self.stress_deviator, self.mesh.dim)
@property
def stress(self):
r"""Total Cauchy stress tensor.
The total stress combines the deviatoric stress and pressure:
.. math::
\boldsymbol{\sigma} = \boldsymbol{\tau} - p\mathbf{I}
where :math:`\boldsymbol{\tau}` is the deviatoric stress and
:math:`p` is the pressure (positive in compression).
Returns
-------
sympy.Matrix
Total stress tensor of shape ``(dim, dim)``.
See Also
--------
stress_deviator : Deviatoric (traceless) part.
"""
return self.stress_deviator - sympy.eye(self.mesh.dim) * (self.p.sym[0])
@property
def stress_1d(self):
r"""Total stress in Voigt notation.
Returns
-------
sympy.Matrix
Total stress in Voigt notation.
See Also
--------
stress : Full tensor form.
"""
return uw.maths.tensor.rank2_to_voigt(self.stress, self.mesh.dim)
@property
def div_u(self):
r"""Velocity divergence.
For incompressible flow, this should be zero:
.. math::
\nabla \cdot \mathbf{u} = 0
Returns
-------
sympy.Expr
Scalar divergence expression.
Notes
-----
Non-zero divergence indicates compressibility or mass sources/sinks.
"""
return self.mesh.vector.divergence(self.Unknowns.u.sym)
@property
def constraints(self):
r"""Constraint equation for the saddle-point system.
By default, this is the incompressibility constraint
:math:`\nabla \cdot \mathbf{u} = 0`. Can be modified for
compressible or other constrained formulations.
Returns
-------
sympy.Expr
Constraint expression.
"""
return self._constraints
@constraints.setter
def constraints(self, constraints_matrix):
"""Set the constraint equation (e.g., incompressibility)."""
self._is_setup = False
symval = sympify(constraints_matrix)
self._constraints = symval
@property
def bodyforce(self):
r"""Body force vector (source term).
The volumetric body force :math:`\mathbf{f}` appears on the
right-hand side of the momentum equation:
.. math::
-\nabla \cdot \boldsymbol{\sigma} = \mathbf{f}
Common examples include gravity (:math:`\rho\mathbf{g}`) or
buoyancy forces.
Returns
-------
UWexpression
Body force vector expression.
"""
return self._bodyforce
@bodyforce.setter
def bodyforce(self, value):
"""Set the body force vector (e.g., gravity, buoyancy)."""
self._needs_function_rewire = True
if isinstance(value, uw.function.expressions.UWexpression):
self._bodyforce.sym = value.sym
else:
# Convert UWQuantity objects to SymPy expressions before Matrix creation
converted_value = []
for item in value:
if isinstance(item, uw.function.quantities.UWQuantity):
sympified = item._sympify_()
# If UWQuantity contains a Matrix, extract the scalar element
if hasattr(sympified, "shape") and sympified.shape == (1, 1):
converted_value.append(sympified[0, 0])
else:
converted_value.append(sympified)
else:
converted_value.append(item)
self._bodyforce.sym = sympy.Matrix(converted_value)
@property
def saddle_preconditioner(self):
r"""Preconditioner for the Schur complement in the saddle-point system.
For the Stokes system, the default preconditioner is :math:`1/\eta`
(inverse viscosity), which approximates the Schur complement
:math:`\mathbf{S} \approx \mathbf{B}\mathbf{A}^{-1}\mathbf{B}^T`.
Returns
-------
sympy.Expr
Preconditioner expression (typically inverse viscosity).
Notes
-----
A good preconditioner significantly improves convergence of the
iterative solver. For variable viscosity, use the local viscosity.
"""
return self._saddle_preconditioner
@saddle_preconditioner.setter
def saddle_preconditioner(self, value):
"""Set the Schur complement preconditioner."""
self._needs_function_rewire = True
symval = sympify(value)
self._saddle_preconditioner = symval
@property
def penalty(self):
r"""Augmented Lagrangian penalty parameter.
The penalty $\lambda$ adds a term to the weak form that
penalizes non-zero divergence:
.. math::
\lambda \int (\nabla \cdot \mathbf{u})(\nabla \cdot \mathbf{v}) \, dV
This improves convergence for incompressible flow without
changing the solution (since $\nabla \cdot \mathbf{u} = 0$
at convergence).
Returns
-------
UWexpression
Augmented Lagrangian penalty parameter (typically $O(1)$).
Notes
-----
Set to zero for standard Stokes without augmentation.
Unlike classical penalty methods that require very large values,
the Augmented Lagrangian approach uses modest penalties of $O(1)$
to improve solver convergence.
References
----------
Glowinski, R., & Le Tallec, P. (1989). *Augmented Lagrangian and
Operator-Splitting Methods in Nonlinear Mechanics*. SIAM.
https://doi.org/10.1137/1.9781611970838
"""
return self._penalty
@penalty.setter
def penalty(self, value):
"""Set the augmented Lagrangian penalty parameter."""
self._needs_function_rewire = True
self._penalty.sym = value
# @property
# def continuity_rhs(self):
# return self._continuity_rhs
# @continuity_rhs.setter
# def continuity_rhs(self, value):
# self.is_setup = False
# self._continuity_rhs.sym = value
[docs]
@timing.routine_timer_decorator
def estimate_dt(self):
r"""
Calculates an appropriate advective timestep for the Stokes solver.
The Stokes equations are quasi-static (no time derivative ∂v/∂t),
so there is no diffusive CFL constraint. The only relevant timescale
is the advective one: how long it takes material to cross an element.
This method computes a per-element timestep:
.. math::
\delta t_i = h_i / \|\mathbf{v}_i\|
where $h_i$ is the element radius and $\mathbf{v}_i$ is the velocity at the element
centroid, then returns the global minimum. This is more accurate than
using global max velocity with global min element size, especially for
non-uniform meshes with spatially varying velocity.
Returns:
Pint Quantity or float: The advective timestep with physical time units
if a model with reference scales is available, otherwise nondimensional.
"""
from mpi4py import MPI
# Evaluate velocity at element centroids (consistent with AdvDiff)
vel = uw.function.evaluate(self.u.sym, self.mesh._centroids)
# If vel is unit-aware (UnitAwareArray), nondimensionalise it to get
# consistent nondimensional values that match mesh._radii
# Note: .magnitude returns physical units, which would be wrong here
if hasattr(vel, "units") and vel.units is not None:
vel = uw.non_dimensionalise(vel)
elif hasattr(vel, "magnitude"):
# Plain UWQuantity without units context - use magnitude
vel = vel.magnitude
# Ensure vel is a plain numpy array
vel = np.asarray(vel)
# Squeeze out any singleton dimensions from evaluate (shape: N,1,dim -> N,dim)
vel = np.squeeze(vel)
# Handle edge case of single element (ensure 2D)
if vel.ndim == 1:
vel = vel.reshape(1, -1)
# Get per-element velocity magnitudes
vel_magnitudes = np.linalg.norm(vel, axis=1)
# Get per-element radii (characteristic element size)
element_radii = self.mesh._radii
# Compute per-element advective timestep: dt_i = h_i / |v_i|
# Avoid division by zero for elements with zero velocity
with np.errstate(divide='ignore', invalid='ignore'):
dt_per_element = np.where(
vel_magnitudes > 0,
element_radii / vel_magnitudes,
np.inf
)
# Get local minimum timestep
min_dt_local = np.min(dt_per_element) if len(dt_per_element) > 0 else np.inf
# Get global minimum timestep (parallel-safe)
comm = uw.mpi.comm
min_dt_glob = comm.allreduce(min_dt_local, op=MPI.MIN)
# If all velocities are zero, return infinity
if np.isinf(min_dt_glob):
return np.inf
# Dimensionalise to physical time
try:
return uw.dimensionalise(np.squeeze(min_dt_glob), {'[time]': 1})
except Exception:
return np.squeeze(min_dt_glob)
[docs]
class SNES_VE_Stokes(SNES_Stokes):
r"""Viscoelastic Stokes solver (backward-compatibility wrapper).
.. deprecated::
Use ``uw.systems.Stokes`` directly with a
``ViscoElasticPlasticFlowModel`` constitutive model. The Stokes
solver now creates stress history infrastructure automatically
when the constitutive model is assigned (the lazy-creation
pathway also reads ``stress_history_ddt_kwargs`` from the model
— required for ``integrator='etd'`` to allocate
``forcing_star``). VE_Stokes pre-creates the DDt at solver
``__init__`` time, before the model exists, so it can't see
those kwargs and is incompatible with ``integrator='etd'``.
Migration: replace::
stokes = uw.systems.VE_Stokes(mesh, velocityField=v, pressureField=p, order=2)
stokes.constitutive_model = uw.constitutive_models.ViscoElasticPlasticFlowModel
with::
stokes = uw.systems.Stokes(mesh, velocityField=v, pressureField=p)
stokes.constitutive_model = uw.constitutive_models.ViscoElasticPlasticFlowModel(
stokes.Unknowns, order=2,
)
Parameters
----------
mesh : Mesh
The computational mesh.
order : int, default=2
BDF time integration order for stress history.
**kwargs
All other arguments are passed to :class:`SNES_Stokes`.
"""
instances = 0
[docs]
def __init__(
self,
mesh: uw.discretisation.Mesh,
velocityField: Optional[uw.discretisation.MeshVariable] = None,
pressureField: Optional[uw.discretisation.MeshVariable] = None,
degree: Optional[int] = 2,
order: Optional[int] = 2,
p_continuous: Optional[bool] = True,
verbose: Optional[bool] = False,
DuDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None,
DFDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None,
):
import warnings
warnings.warn(
"VE_Stokes is deprecated. Use uw.systems.Stokes(...) directly and "
"assign the constitutive model afterwards — the Stokes solver creates "
"DDt infrastructure lazily when the model is assigned, and the lazy "
"path correctly forwards stress_history_ddt_kwargs from the model "
"(required for integrator='etd' to allocate forcing_star). "
"VE_Stokes pre-creates DDt at __init__ time, before the model exists, "
"so it cannot use ETD-2. See the class docstring for migration.",
DeprecationWarning, stacklevel=2,
)
super().__init__(
mesh,
velocityField,
pressureField,
degree,
p_continuous,
verbose,
DuDt=DuDt,
DFDt=DFDt,
)
# Pre-create DFDt so it's available before constitutive model is set
self._create_stress_history_ddt(order=order)
return
@property
def delta_t(self):
"""Elastic timestep from the constitutive model."""
return self.constitutive_model.Parameters.dt_elastic
[docs]
class SNES_Projection(SNES_Scalar):
r"""
Scalar projection solver for mapping functions to mesh variables.
Solves :math:`u = \tilde{f}` where :math:`\tilde{f}` is a function that
can be evaluated within an element and :math:`u` is a mesh variable with
associated shape functions.
Typically used to obtain a continuous representation of a function not
well-defined at mesh nodes (e.g., derivatives or flux components). More
broadly, it is a projection from one basis to another.
Strong form (the screened-Poisson smoother)
-------------------------------------------
The projection is implemented by solving
.. math::
u - \nabla \cdot \left( \alpha \nabla u \right) = \tilde{f},
or equivalently, in the more familiar Helmholtz form
.. math::
u - \alpha \, \nabla^{2} u \;=\; \tilde{f}.
With :math:`\alpha = 0` this is a pure pointwise L2 projection of
:math:`\tilde f` onto the discrete space of :math:`u`. With
:math:`\alpha > 0` it is a *screened-Poisson smoother*: the equation
enforces a balance between fidelity to :math:`\tilde f` and curvature of
:math:`u`. The natural length scale that emerges from this balance is
.. math::
L \;=\; \sqrt{\alpha},
so :math:`\alpha` has dimensions of **length squared**. The free-space
Green's function of the operator decays as
:math:`\exp(-r/L)` (in 2D it is
:math:`G(r) \propto K_{0}(r/L)/L^{2}`), so the solution behaves like a
Gaussian-like convolution of :math:`\tilde f` of width :math:`L` —
obtained implicitly by one elliptic solve, without ever assembling
the kernel. Features in :math:`\tilde f` of scale much smaller than
:math:`L` are attenuated; features much larger than :math:`L` pass
through essentially unchanged.
Weak form
---------
Multiplying by a test function :math:`v` and integrating by parts gives
the symmetric weak form actually assembled by PETSc:
.. math::
\int_\Omega (u - \tilde f)\, v \; + \;
\int_\Omega \alpha \, \nabla u \cdot \nabla v
\;=\; 0,
which is exactly minimising
:math:`\tfrac{1}{2}\!\int (u-\tilde f)^2 + \tfrac{\alpha}{2}\!\int
|\nabla u|^2` — a Tikhonov-regularised L2 projection.
Setting the smoothing length
----------------------------
Two equivalent accessors are provided:
* :attr:`smoothing` — set the coefficient :math:`\alpha` directly
(units of length²). Historically used with tiny values (e.g.
:math:`10^{-6}`) as a *numerical* regulariser, which corresponds to
a sub-grid :math:`L` and produces no physical smoothing.
* :attr:`smoothing_length` — set :math:`L` directly (length units,
unit-aware via :func:`underworld3.non_dimensionalise`). This is the
recommended path when you actually want the projection to act as
a low-pass filter of a chosen physical scale.
See Also
--------
SNES_Vector_Projection : Vector field projection.
SNES_Tensor_Projection : Tensor field projection.
Parameters
----------
mesh : Mesh
The computational mesh.
u_Field : MeshVariable, optional
Target mesh variable for the projection.
scalar_Field : MeshVariable, optional
Alternative name for the target field.
degree : int, default=2
Polynomial degree for the finite element space.
solver_name : str, optional
Name for the solver instance.
verbose : bool, default=False
Enable verbose output.
"""
[docs]
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
u_Field: uw.discretisation.MeshVariable = None,
degree=2,
verbose=False,
):
super().__init__(
mesh,
u_Field,
degree,
verbose,
)
self._needs_function_rewire = True
self._smoothing = sympy.sympify(0)
self._uw_weighting_function = sympy.sympify(1)
self._uw_function = sympy.Matrix([0]) # Default: project zero
self._constitutive_model = uw.constitutive_models.Constitutive_Model(self.Unknowns)
return
# =========================================================================
# PETSc Residual Templates
# L2 projection: minimize ||u - f||^2 with optional smoothing
# =========================================================================
F0 = Template(
r"f_0 \left( \mathbf{u} \right)",
lambda self: (self.u.sym - self.uw_function) * self.uw_weighting_function,
r"""Projection misfit term (pointwise).
The $f_0$ term measures the weighted difference between the mesh
variable and the target function for L2 projection.
""",
)
F1 = Template(
r"\mathbf{F}_1\left( \mathbf{u} \right)",
lambda self: self.smoothing * self.mesh.vector.gradient(self.u.sym),
r"""Projection smoothing term (pointwise).
The $\mathbf{F}_1$ term provides optional regularization via
$\alpha \nabla u$, where $\alpha$ is the ``smoothing`` parameter.
""",
)
# Use SymbolicProperty for automatic unwrapping
uw_function = SymbolicProperty(matrix_wrap=True, doc="Function to project onto mesh")
@property
def smoothing(self):
r"""Smoothing coefficient :math:`\alpha` of the screened-Poisson form.
The projection solves
:math:`u - \nabla\!\cdot\!(\alpha\,\nabla u) = \tilde f`, so
:math:`\alpha` has dimensions of **length²** and the implied
smoothing length is :math:`L = \sqrt{\alpha}`. The free-space
Green's function decays as :math:`\exp(-r/L)`, giving the
projection the action of a Gaussian-like convolution of width
:math:`L` — without ever forming the kernel.
See the class docstring for the full derivation.
Two usage patterns
~~~~~~~~~~~~~~~~~~
* **Pure L2 projection** — ``smoothing = 0`` (or omit). No
regularisation; ``u`` is the best L2 fit to ``ũ``.
* **Genuine length-scale smoother** — set
:attr:`smoothing_length` to the desired physical length
(unit-aware), or equivalently
``smoothing = L**2``. The output is ``ũ`` smoothed at
scale ``L``.
.. note::
Historical UW3 code occasionally sets ``smoothing`` to a
tiny number (e.g. ``1e-6``) as a *numerical* regulariser
against rank deficiency. That value corresponds to
:math:`L \approx 10^{-3}` in the problem's ND length
units — almost always well below the cell size, so it
does no useful smoothing. Use a true sub-grid value only
if you need that regularisation; for filtering, use
:attr:`smoothing_length`.
"""
return self._smoothing
@smoothing.setter
def smoothing(self, smoothing_factor):
"""Set the smoothing regularization parameter."""
self._needs_function_rewire = True
self._smoothing = sympify(smoothing_factor)
@property
def smoothing_length(self):
r"""Smoothing length scale :math:`L` (length units, **unit-aware**).
L-valued view on :attr:`smoothing` with the convention
:math:`L = \sqrt{\alpha}`. Setting ``smoothing_length = L``
is equivalent to setting ``smoothing = L**2``, but is the
natural physical knob because :math:`L` is what the
smoother actually filters by.
Mathematical meaning
--------------------
The projection then solves the screened-Poisson equation
.. math::
u \;-\; L^{2}\,\nabla^{2} u \;=\; \tilde f,
whose Green's function decays as :math:`\exp(-r/L)`. In
practice this acts like a Gaussian convolution of width
:math:`L`:
* features in :math:`\tilde f` with spatial scale
:math:`\ll L` are damped (roughly as
:math:`1/(1+k^{2}L^{2})` for a wavenumber :math:`k`);
* features with scale :math:`\gg L` are preserved;
* features at :math:`\sim L` are attenuated by a factor
near ``1/2``.
Choosing :math:`L` smaller than the local mesh size has
essentially no effect (the field is already band-limited
by the discretisation). A useful default for *light*
de-noising is :math:`L \approx 1\!-\!2\,h`, where
:math:`h` is a representative cell size.
Units
-----
The setter accepts a plain number (assumed already
non-dimensional), a pint ``Quantity`` with length units,
or any unit-aware object understood by
:func:`underworld3.non_dimensionalise`. Internally the
squared non-dimensional value is stored in
``self._smoothing`` (so ``smoothing`` and
``smoothing_length`` stay consistent).
The getter returns a Pint ``Quantity`` with length units
when a scaling context is configured; otherwise the plain
non-dimensional float :math:`\sqrt{\alpha}`.
"""
import sympy
s = self._smoothing
try:
sval = float(s)
except (TypeError, ValueError):
return sympy.sqrt(s)
if sval < 0:
return None
L_nd = sval ** 0.5
# Re-dimensionalise to length units if a scaling context
# is set; fall back to the plain ND float otherwise.
try:
return uw.scaling.dimensionalise(
L_nd, uw.scaling.units.meter)
except Exception:
return L_nd
@smoothing_length.setter
def smoothing_length(self, L):
"""Set the smoothing length scale.
Accepts a Pint Quantity (with length units), a UnitAware
scalar, or a plain non-dimensional number. The value is
non-dimensionalised through the active scaling context
before being squared and stored as ``self._smoothing``.
"""
self._needs_function_rewire = True
# Unit-aware: route through non_dimensionalise so the
# caller can pass `2.0 * uw.scaling.units.meter` or a
# plain float interchangeably.
try:
L_nd = uw.non_dimensionalise(L)
except Exception:
# Fall back to magnitude-or-float coercion if the
# value doesn't carry/expect units.
if hasattr(L, "magnitude"):
L_nd = L.magnitude
else:
L_nd = L
self._smoothing = sympify(L_nd) ** 2
@property
def uw_weighting_function(self):
"""Weighting function applied during projection."""
return self._uw_weighting_function
@uw_weighting_function.setter
def uw_weighting_function(self, user_uw_function):
"""Set the weighting function for the projection."""
self._needs_function_rewire = True
self._uw_weighting_function = user_uw_function
## --------------------------------
## Project from pointwise vector
## functions to nodal point unknowns
##
## We can add boundary constraints in the usual way (parent class handles this)
## We can add smoothing (which takes the form of a viscosity term)
## We could consider an incompressibility constraint ... or remove null spaces
## --------------------------------
[docs]
class SNES_Vector_Projection(SNES_Vector):
r"""
Vector projection solver for mapping vector functions to mesh variables.
Solves :math:`\mathbf{u} = \tilde{\mathbf{f}}` where
:math:`\tilde{\mathbf{f}}` is a vector function that can be evaluated
within an element and :math:`\mathbf{u}` is a vector mesh variable
with associated shape functions.
Typically used to obtain a continuous representation of a vector
function not well-defined at mesh nodes (e.g., gradient or flux
vectors), or as a length-scale-aware smoother of an existing vector
field.
Strong form (screened-Poisson, vector-valued)
---------------------------------------------
.. math::
\mathbf{u} \;-\; \nabla \cdot \left( \alpha\, \nabla \mathbf{u}
\right)
\;+\; \lambda \left( \nabla \cdot \mathbf{u} \right) \mathbf{I}
\;=\; \tilde{\mathbf{f}} .
The :math:`\alpha`-term is the same screened-Poisson smoother as
in :class:`SNES_Projection`, applied component-wise: it has the same
:math:`L = \sqrt{\alpha}` smoothing-length interpretation and the
same :math:`\exp(-r/L)` Green's function. The extra :math:`\lambda`
term is a divergence penalty (see :attr:`penalty`) — set it nonzero
when you want an approximately solenoidal projection of
:math:`\tilde{\mathbf{f}}`.
Setting :math:`\alpha = 0` (and :math:`\lambda = 0`) gives a pure
pointwise L2 projection. See :class:`SNES_Projection` for the full
mathematical context; the relationship between :attr:`smoothing`
(units length²) and :attr:`smoothing_length` (units length) is the
same as for the scalar projection.
Parameters
----------
mesh : Mesh
The computational mesh.
u_Field : MeshVariable, optional
Target vector mesh variable for the projection.
degree : int, default=2
Polynomial degree for the finite element space.
verbose : bool, default=False
Enable verbose output.
See Also
--------
SNES_Projection : Scalar field projection (full mathematical detail).
SNES_Tensor_Projection : Tensor field projection.
"""
[docs]
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
u_Field: uw.discretisation.MeshVariable = None,
degree=2,
verbose=False,
):
super().__init__(
mesh,
u_Field,
degree,
verbose,
)
self._needs_function_rewire = True
self._smoothing = 0.0
self._penalty = 0.0
self._uw_weighting_function = 1.0
self._uw_function = sympy.Matrix([[0] * self.mesh.dim]) # Default: project zero vector
self._constitutive_model = uw.constitutive_models.Constitutive_Model(self.Unknowns)
return
# Use Template for persistent read-only template expressions
F0 = Template(
r"f_0 \left( \mathbf{u} \right)",
lambda self: (self.u.sym - self.uw_function) * self.uw_weighting_function,
"Vector projection pointwise misfit term: f_0(u)",
)
F1 = Template(
r"\mathbf{F}_1\left( \mathbf{u} \right)",
lambda self: (
self.smoothing * self.Unknowns.E
+ self.penalty * self.mesh.vector.divergence(self.u.sym) * sympy.eye(self.mesh.dim)
),
"Vector projection pointwise smoothing term: F_1(u)",
)
[docs]
@timing.routine_timer_decorator
def projection_problem_description(self):
"""Build residual terms for vector projection FEM assembly."""
# residual terms - defines the problem:
# solve for a best fit to the continuous mesh
# variable given the values in self.function
# F0 is left in place for the user to inject
# non-linear constraints if required
self._f0 = self.F0.sym
# F1 is left in the users control ... e.g to add other gradient constraints to the stiffness matrix
self._f1 = (
self.F1.sym
+ self.smoothing * self.Unknowns.E
+ self.penalty * self.mesh.vector.divergence(self.u.sym) * sympy.eye(self.mesh.dim)
)
return
# Use SymbolicProperty for automatic unwrapping
uw_function = SymbolicProperty(matrix_wrap=True, doc="Vector function to project onto mesh")
@property
def smoothing(self):
r"""Smoothing coefficient :math:`\alpha` (units **length²**).
Coefficient of the :math:`\nabla\!\cdot\!(\alpha\,\nabla
\mathbf u)` term in the vector screened-Poisson equation
(see the class docstring). Acts component-wise; the
smoothing length is :math:`L = \sqrt{\alpha}` and the
Green's function decays as :math:`\exp(-r/L)`.
Use :attr:`smoothing_length` for the L-valued, unit-aware
knob. See :attr:`SNES_Projection.smoothing` for the full
derivation and usage patterns.
"""
return self._smoothing
@smoothing.setter
def smoothing(self, smoothing_factor):
"""Set the smoothing regularization parameter."""
self._needs_function_rewire = True
self._smoothing = sympify(smoothing_factor)
@property
def smoothing_length(self):
r"""Smoothing length :math:`L` (length units, **unit-aware**).
L-valued view on :attr:`smoothing`, with
:math:`L = \sqrt{\alpha}`. The projection then acts as a
component-wise screened-Poisson smoother
:math:`\mathbf u - L^{2}\,\nabla^{2}\mathbf u =
\tilde{\mathbf f}`, i.e. a Gaussian-like convolution of
width :math:`L` applied to each Cartesian component of
the input vector field.
Choose :math:`L \gtrsim h` (a cell size) for noticeable
smoothing; :math:`L < h` does essentially nothing because
the discretisation already band-limits at that scale.
See :attr:`SNES_Projection.smoothing_length` for the full
mathematical and units discussion.
"""
import sympy
s = self._smoothing
try:
sval = float(s)
except (TypeError, ValueError):
return sympy.sqrt(s)
if sval < 0:
return None
L_nd = sval ** 0.5
try:
return uw.scaling.dimensionalise(
L_nd, uw.scaling.units.meter)
except Exception:
return L_nd
@smoothing_length.setter
def smoothing_length(self, L):
"""Set the smoothing length scale (unit-aware)."""
self._needs_function_rewire = True
try:
L_nd = uw.non_dimensionalise(L)
except Exception:
if hasattr(L, "magnitude"):
L_nd = L.magnitude
else:
L_nd = L
self._smoothing = sympify(L_nd) ** 2
@property
def penalty(self):
r"""Divergence penalty :math:`\lambda` for (approx.) incompressibility.
Coefficient of the :math:`\lambda (\nabla\!\cdot\!\mathbf u)
\mathbf I` term in the vector projection. Large positive
values bias the projection toward
:math:`\nabla\!\cdot\!\mathbf u = 0`, i.e. a solenoidal
approximation of :math:`\tilde{\mathbf f}`. Has no length
interpretation — unlike :attr:`smoothing` it does not
introduce a filter scale.
"""
return self._penalty
@penalty.setter
def penalty(self, value):
"""Set the divergence penalty parameter."""
self._needs_function_rewire = True
symval = sympify(value)
self._penalty = symval
@property
def uw_weighting_function(self):
"""Weighting function applied during projection."""
return self._uw_weighting_function
@uw_weighting_function.setter
def uw_weighting_function(self, user_uw_function):
"""Set the weighting function for the projection."""
self._needs_function_rewire = True
self._uw_weighting_function = user_uw_function
class SNES_Tensor_Projection(SNES_Projection):
r"""
Tensor projection solver for mapping tensor functions to mesh variables.
Solves :math:`\mathbf{u} = \tilde{\mathbf{f}}` where
:math:`\tilde{\mathbf{f}}` is a tensor-valued function that can be
evaluated within an element and :math:`\mathbf{u}` is a tensor mesh
variable with associated shape functions.
Typically used to obtain a continuous representation of a tensor
function not well-defined at mesh nodes (e.g., stress or strain
tensors), with optional length-scale smoothing.
Strong form (screened-Poisson, applied component-wise)
------------------------------------------------------
Internally the solve is decomposed into scalar sub-problems, one per
tensor component :math:`u_{ij}`; each sub-problem is
.. math::
u_{ij} \;-\; \nabla \cdot \left( \alpha\, \nabla u_{ij}
\right) \;=\; \tilde{f}_{ij},
identical to :class:`SNES_Projection`. The smoothing length is
:math:`L = \sqrt{\alpha}` and the Green's function decays as
:math:`\exp(-r/L)`; setting :math:`\alpha = 0` gives a pure
pointwise L2 projection. See :class:`SNES_Projection` for the full
derivation, the choice between :attr:`smoothing` (length²) and
:attr:`smoothing_length` (length, unit-aware), and guidance on
picking :math:`L` relative to the cell size.
Parameters
----------
mesh : Mesh
The computational mesh.
tensor_Field : MeshVariable, optional
Target tensor mesh variable for the projection.
scalar_Field : MeshVariable, optional
Scalar work variable used internally.
degree : int, default=2
Polynomial degree for the finite element space.
verbose : bool, default=False
Enable verbose output.
Notes
-----
Currently implemented component-wise as there is no native solver
for tensor unknowns. Each component sees the same :math:`\alpha`,
so the effective smoothing length :math:`L` is uniform across the
tensor entries.
See Also
--------
SNES_Projection : Scalar field projection (full mathematical detail).
SNES_Vector_Projection : Vector field projection.
"""
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
tensor_Field: uw.discretisation.MeshVariable = None,
scalar_Field: uw.discretisation.MeshVariable = None,
degree=2,
verbose=False,
):
self.t_field = tensor_Field
super().__init__(
mesh=mesh,
u_Field=scalar_Field,
degree=degree,
verbose=verbose,
)
return
## Need to over-ride solve method to run over all components
@timing.routine_timer_decorator
def solve(self, verbose=False, divergence_retries: int = 0):
"""Solve by projecting each tensor component sequentially.
Parameters
----------
verbose : bool
divergence_retries : int, default=0
Forwarded to each per-component SNES solve. 0 preserves
legacy behaviour.
"""
# Loop over the components of the tensor. If this is a symmetric
# tensor, we'll usually be given the 1d form to prevent duplication
# if self.t_field.sym_1d.shape != self.uw_function.shape:
# raise ValueError(
# "Tensor shapes for uw_function and MeshVariable are not the same"
# )
symm = self.t_field.sym.is_symmetric()
for i in range(self.uw_function.shape[0]):
for j in range(self.uw_function.shape[1]):
if symm and j > i:
continue
self.uw_scalar_function = sympy.Matrix([[self.uw_function[i, j]]])
self.u.array[:, 0, 0] = self.t_field.array[:, i, j]
# solve the projection for the scalar sub-problem
super().solve(verbose=verbose)
self.t_field.array[:, i, j] = self.u.array[:, 0, 0]
# That might be all ...
# This is re-defined so it uses uw_scalar_function
@property
def F0(self):
"""Pointwise misfit term for scalar subproblem."""
f0_val = expression(
r"f_0 \left( \mathbf{u} \right)",
(self.u.sym - self.uw_scalar_function) * self.uw_weighting_function,
"Scalar subproblem of tensor projection: f_0(u)",
)
# backward compatibility
self._f0 = f0_val
return f0_val
@property
def F1(self):
"""Pointwise smoothing flux term for scalar subproblem."""
F1_val = expression(
r"\mathbf{F}_1\left( \mathbf{u} \right)",
self.smoothing * self.mesh.vector.gradient(self.u.sym),
"Scalar subproblem of tensor projection (smoothing): F_1(u)",
)
# backward compatibility
self._f1 = F1_val
return F1_val
@property
def uw_scalar_function(self):
"""Current scalar component function being projected."""
return self._uw_scalar_function
@uw_scalar_function.setter
def uw_scalar_function(self, user_uw_function):
"""Set the scalar component function for current tensor element."""
self._needs_function_rewire = True
self._uw_scalar_function = user_uw_function
class SNES_MultiComponent_Projection(SNES_MultiComponent):
r"""
Multi-component projection solver.
Projects an N-component row-matrix expression onto an N-component
mesh variable in a **single** SNES solve, sharing one DM across all
components. Replaces the per-component cycling used by
:class:`SNES_Tensor_Projection`, which tears down and rebuilds the
PETSc DM on every inner iteration.
Strong form (block-diagonal screened Poisson)
---------------------------------------------
There is no cross-component coupling, so each component
:math:`u_k,\ k=1,\dots,N_c` satisfies the same scalar
screened-Poisson equation as :class:`SNES_Projection`,
.. math::
u_k \;-\; \nabla \cdot \left( \alpha\, \nabla u_k \right)
\;=\; \tilde f_k.
Setting :math:`\alpha = 0` gives a pure pointwise L2 projection per
component. The smoothing length is :math:`L = \sqrt{\alpha}` and the
Green's function decays as :math:`\exp(-r/L)` — the same Gaussian-like
convolution interpretation as the scalar case. All components share a
single :math:`\alpha`, so the smoothing scale :math:`L` is uniform
across the multi-component target. Use :attr:`smoothing` to set
:math:`\alpha` (units length²) or :attr:`smoothing_length` for the
L-valued, unit-aware knob. See :class:`SNES_Projection` for the full
derivation and guidance.
Parameters
----------
mesh : Mesh
u_Field : MeshVariable, optional
Target ``(1, n_components)`` MATRIX variable. If ``None``, one is
created.
n_components : int, optional
Number of scalar components; required if ``u_Field`` is ``None``.
degree : int, default=2
verbose : bool, default=False
See Also
--------
SNES_Projection : Scalar projection (Nc=1) — full mathematical detail.
SNES_Tensor_Projection : Legacy per-component cycling projector.
"""
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
u_Field: uw.discretisation.MeshVariable = None,
n_components: int = None,
degree=2,
verbose=False,
):
super().__init__(
mesh,
u_Field=u_Field,
n_components=n_components,
degree=degree,
verbose=verbose,
)
self._needs_function_rewire = True
self._smoothing = sympify(0)
self._uw_weighting_function = sympify(1)
self._uw_function = sympy.zeros(1, self._n_components)
self._constitutive_model = uw.constitutive_models.Constitutive_Model(self.Unknowns)
F0 = Template(
r"f_0 \left( \mathbf{u} \right)",
lambda self: (self.u.sym - self.uw_function) * self.uw_weighting_function,
"Per-component projection misfit (row matrix shape (1, Nc)).",
)
F1 = Template(
r"\mathbf{F}_1 \left( \mathbf{u} \right)",
lambda self: self.smoothing * self.Unknowns.L,
"Per-component block-diagonal Laplacian smoothing (shape (Nc, dim)).",
)
uw_function = SymbolicProperty(
matrix_wrap=True,
doc="Row matrix (1, n_components) of expressions to project.",
)
@property
def smoothing(self):
r"""Smoothing coefficient :math:`\alpha` (units **length²**).
Coefficient of the :math:`\nabla\!\cdot\!(\alpha\,\nabla
u_k)` term in each component's screened-Poisson sub-problem
(see the class docstring). One :math:`\alpha` is shared
across all :math:`N_c` components, so the implied smoothing
length :math:`L = \sqrt{\alpha}` is uniform.
Use :attr:`smoothing_length` for the L-valued, unit-aware
knob. See :attr:`SNES_Projection.smoothing` for the full
derivation and the Gaussian-like convolution interpretation.
"""
return self._smoothing
@smoothing.setter
def smoothing(self, value):
self._needs_function_rewire = True
self._smoothing = sympify(value)
@property
def smoothing_length(self):
r"""Smoothing length :math:`L` (length units, **unit-aware**).
L-valued view on :attr:`smoothing`, with
:math:`L = \sqrt{\alpha}`. Each component then satisfies
:math:`u_k - L^{2}\,\nabla^{2} u_k = \tilde f_k`, i.e. a
Gaussian-like convolution of width :math:`L` applied
independently to each component of the multi-component
target. See :attr:`SNES_Projection.smoothing_length` for the
full mathematical and units discussion.
"""
import sympy
s = self._smoothing
try:
sval = float(s)
except (TypeError, ValueError):
return sympy.sqrt(s)
if sval < 0:
return None
L_nd = sval ** 0.5
try:
return uw.scaling.dimensionalise(
L_nd, uw.scaling.units.meter)
except Exception:
return L_nd
@smoothing_length.setter
def smoothing_length(self, L):
"""Set the smoothing length scale (unit-aware)."""
self._needs_function_rewire = True
try:
L_nd = uw.non_dimensionalise(L)
except Exception:
if hasattr(L, "magnitude"):
L_nd = L.magnitude
else:
L_nd = L
self._smoothing = sympify(L_nd) ** 2
@property
def uw_weighting_function(self):
"""Weighting function applied during projection."""
return self._uw_weighting_function
@uw_weighting_function.setter
def uw_weighting_function(self, value):
self._needs_function_rewire = True
self._uw_weighting_function = value
# #################################################
# # Swarm-based advection-diffusion
# # solver based on SNES_Poisson and swarm-variable
# # projection
# #
# #################################################
[docs]
class SNES_AdvectionDiffusion(SNES_Scalar):
r"""
Advection-diffusion equation solver using semi-Lagrangian Crank-Nicolson.
Implements the characteristics-based method described in Spiegelman & Katz (2006):
.. math::
\underbrace{\frac{\partial u}{\partial t} + \left( \mathbf{v} \cdot \nabla
\right) u}_{\dot{u}} - \nabla \cdot \underbrace{\left[ \boldsymbol{\kappa}
\nabla u \right]}_{\mathbf{F}} = \underbrace{f}_{\mathbf{h}}
The flux term :math:`\mathbf{F}` relates diffusive fluxes to gradients in
:math:`u`. Advective fluxes along the velocity field :math:`\mathbf{v}` are
handled in the :math:`\dot{u}` term.
The time derivative :math:`\dot{u}` involves upstream sampling to find
:math:`u^*`, the value of :math:`u` at points which later arrive at mesh
nodes. This is achieved using a hidden swarm variable advected backwards
from nodal points automatically during solve.
Parameters
----------
mesh : Mesh
The computational mesh.
u_Field : MeshVariable
Mesh variable for the transported scalar.
V_fn : MeshVariable or sympy.Basic
Velocity field for advection.
order : int, default=1
Time integration order (1 or 2).
restore_points_func : callable, optional
Function to restore particles to valid domain.
verbose : bool, default=False
Enable verbose output.
DuDt : SemiLagrangian_DDt or Lagrangian_DDt, optional
Time derivative operator for the unknown.
DFDt : SemiLagrangian_DDt or Lagrangian_DDt, optional
Time derivative operator for the flux.
monotone_mode : str or None, optional
Monotonicity limiter for the semi-Lagrangian trace-back.
Forwarded to the internally-constructed ``SemiLagrangian_DDt``
instances for ``DuDt`` and ``DFDt``.
- ``None`` (default): pure FE trace-back. Can overshoot at
non-nodal upstream points in cells with sharp gradients
(e.g. thin boundary layers in deformed cells); legacy
behaviour.
- ``"clamp"``: clip the FE trace-back result to
``[nbr_min, nbr_max]`` of the ``k = dim + 1`` nearest
``psi_star`` DOFs. Bit-identical to pure FE in smooth
regions; bounds overshoots where the FE interpolant would
otherwise leave the local data range.
- ``"pick"``: keep the FE result where in-bounds; for
out-of-bounds DOFs only, re-evaluate via Shepard's-method
RBF. More conservative than clamp at the catastrophe edge.
When a user supplies a pre-built ``DuDt``, this kwarg is
applied to the internally-constructed ``DFDt`` only — the
user's ``DuDt`` already encodes whatever ``monotone_mode``
it was constructed with.
theta : float, default=0.5
Adams-Moulton theta for the diffusive flux at order 1.
Forwarded to the internally-constructed
``SemiLagrangian_DDt`` instances (same forwarding rule as
``monotone_mode``).
- ``0.5`` (default): Crank-Nicolson, A-stable but not
L-stable. Second-order accurate. Rings on stiff modes
with sharp gradients (negative amplification factor).
- ``1.0``: Backward Euler, L-stable, monotone for the
diffusive flux. First-order accurate. Recommended when
SLCN+CN ringing dominates the discretisation error.
- ``0.0``: Forward Euler — unstable for stiff diffusion;
included for completeness.
Notes
-----
- The diffusivity :math:`\kappa` is set via the ``constitutive_model`` property
- Sources :math:`f` can be any sympy expression involving mesh/swarm variables
References
----------
Spiegelman, M., & Katz, R. F. (2006). A semi-Lagrangian Crank-Nicolson
algorithm for the numerical solution of advection-diffusion problems.
*Geochemistry, Geophysics, Geosystems*, 7(4).
https://doi.org/10.1029/2005GC001073
See Also
--------
SNES_Diffusion : Pure diffusion solver without advection.
SNES_Navier_Stokes : Full momentum advection-diffusion.
"""
def _object_viewer(self):
from IPython.display import Latex, Markdown, display
super()._object_viewer()
## feedback on this instance
display(Latex(r"$\quad\mathrm{u} = $ " + self.u.sym._repr_latex_()))
display(Latex(r"$\quad\mathbf{v} = $ " + self._V_fn._repr_latex_()))
display(Latex(r"$\quad\Delta t = $ " + self.delta_t._repr_latex_()))
[docs]
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
u_Field: uw.discretisation.MeshVariable,
V_fn: Union[uw.discretisation.MeshVariable, sympy.Basic], # Should be a sympy function
order: int = 1,
restore_points_func: Callable = None,
verbose=False,
DuDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None,
DFDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None,
monotone_mode: Optional[str] = None,
theta: float = 0.5,
):
## Parent class will set up default values etc
super().__init__(
mesh,
u_Field,
u_Field.degree,
verbose,
DuDt=DuDt,
DFDt=DFDt,
)
if isinstance(V_fn, uw.discretisation.MeshVariable):
self._V_fn = V_fn.sym
else:
self._V_fn = V_fn
# default values for properties
self.f = sympy.Matrix.zeros(1, 1)
self._constitutive_model = None
# These are unique to the advection solver
self._delta_t = expression(R"\Delta t", 0, "Physically motivated timestep")
self._needs_function_rewire = True
self.restore_points_to_domain_func = restore_points_func
### Setup the history terms ... This version should not build anything
### by default - it's the template / skeleton
## NB - Smoothing is generally required for stability. 0.0001 is effective
## at the various resolutions tested.
if DuDt is None:
self.Unknowns.DuDt = SemiLagrangian_DDt(
self.mesh,
u_Field.sym, # Symbolic expression - SemiLagrangian evaluates this at each update
self._V_fn,
vtype=uw.VarType.SCALAR,
degree=u_Field.degree,
continuous=u_Field.continuous,
varsymbol=u_Field.symbol,
verbose=verbose,
bcs=self.essential_bcs,
order=1,
smoothing=0.0,
monotone_mode=monotone_mode,
theta=theta,
)
else:
# validation
if order is None:
order = DuDt.order
else:
if DuDt.order < order:
raise RuntimeError(
f"DuDt supplied is order {DuDt.order} but order requested is {order}"
)
self.Unknowns.DuDt = DuDt
self.Unknowns.DFDt = SemiLagrangian_DDt(
self.mesh,
sympy.Matrix([[0] * self.mesh.dim]), # Actual function is not defined at this point
self._V_fn,
vtype=uw.VarType.VECTOR,
degree=u_Field.degree,
continuous=True,
# The default is now to match the above and avoid
# any use of projection.
# swarm_degree=u_Field.degree - 1,
# swarm_continuous=False,
varsymbol=rf"{{F[ {self.u.symbol} ] }}",
verbose=verbose,
bcs=None,
order=order,
smoothing=0.0,
monotone_mode=monotone_mode,
theta=theta,
)
return
@property
def F0(self):
"""Pointwise source term including time derivative."""
f0 = expression(
r"f_0 \left( \mathbf{u} \right)",
-self.f + self.DuDt.bdf(0) / self.delta_t,
"Poisson pointwise force term: f_0(u)",
)
# backward compatibility
self._f0 = f0
return f0
@property
def F1(self):
"""Pointwise diffusive flux term (Adams-Moulton integration)."""
F1_val = expression(
r"\mathbf{F}_1\left( \mathbf{u} \right)",
self.DFDt.adams_moulton_flux(),
"Poisson pointwise flux term: F_1(u)",
)
# backward compatibility
self._f1 = F1_val
return F1_val
[docs]
def adv_diff_slcn_problem_description(self):
"""Build residual terms for advection-diffusion FEM assembly."""
# f0 residual term
self._f0 = self.F0.sym
# f1 residual term
self._f1 = self.F1.sym
return
@property
def f(self):
r"""Source term for the advection-diffusion equation.
The source :math:`f` appears on the right-hand side:
.. math::
\frac{\partial u}{\partial t} + \mathbf{V} \cdot \nabla u
= \nabla \cdot (\kappa \nabla u) + f
Returns
-------
sympy.Matrix
Source term expression.
"""
return self._f
@f.setter
def f(self, value):
"""Set the volumetric source term."""
self._needs_function_rewire = True
self._f = sympy.Matrix((value,))
@property
def V_fn(self):
r"""Velocity field for advection.
The advection velocity :math:`\mathbf{V}` transports the scalar
field :math:`u`. Can be a MeshVariable or symbolic expression.
Returns
-------
sympy.Matrix
Velocity vector expression.
"""
return self._V_fn
@V_fn.setter
def V_fn(self, value):
"""
Set the velocity function for advection.
Parameters:
-----------
value : uw.discretisation.MeshVariable or sympy.Basic
Velocity field as either a MeshVariable or sympy expression
"""
self.is_setup = False # Mark as needing setup when velocity changes
if isinstance(value, uw.discretisation.MeshVariable):
self._V_fn = value.sym
else:
self._V_fn = value
@property
def delta_t(self):
r"""Timestep for time integration.
The timestep :math:`\Delta t` controls the temporal discretization.
For explicit advection, this should satisfy the CFL condition:
.. math::
\Delta t < \frac{h}{|\mathbf{V}|}
where :math:`h` is the element size and :math:`|\mathbf{V}|` is the
velocity magnitude.
Returns
-------
UWexpression
Timestep value.
See Also
--------
estimate_dt : Computes a stable timestep automatically.
"""
return self._delta_t
@delta_t.setter
def delta_t(self, value):
"""Set the timestep (handles unit conversion if provided)."""
# Note: comparison must handle potential UWexpressions / UWQuantities
# Use .data or float() to get numeric values for stable comparison
try:
old_dt = float(self._delta_t.data)
new_dt = float(value.data) if hasattr(value, 'data') else float(value)
if np.isclose(old_dt, new_dt, rtol=1e-12, atol=1e-15):
return
except:
pass
self._needs_function_rewire = True
# Handle Pint Quantities with time dimensions
if hasattr(value, "dimensionality"):
# This is a Pint Quantity - check if it has time dimensions
try:
from ..scaling import units as ureg
time_dim = ureg.second.dimensionality
if value.dimensionality == time_dim:
# Convert physical time to nondimensional using model time scale
model = uw.get_default_model()
if model and hasattr(model, "fundamental_scales") and model.fundamental_scales:
time_scale = model.fundamental_scales.get("time")
if time_scale is not None:
# Physical time / time scale = nondimensional time
# Must use to_reduced_units() to convert both quantities
# to same base units before extracting magnitude.
# Otherwise Pint keeps different unit bases (megayear/second)
# and .magnitude returns the unconverted number!
result = value / time_scale
# Get the internal Pint quantity for proper unit conversion
if hasattr(result, "_pint_qty"):
# UWQuantity - access internal Pint quantity
pint_result = result._pint_qty.to_reduced_units()
elif hasattr(result, "to_reduced_units"):
# Raw Pint Quantity
pint_result = result.to_reduced_units()
else:
pint_result = result
# Extract the dimensionless magnitude
if hasattr(pint_result, "magnitude"):
value = float(pint_result.magnitude)
else:
value = float(pint_result)
except Exception:
pass # If anything fails, try to use value as-is
self._delta_t.sym = value
[docs]
@timing.routine_timer_decorator
def estimate_dt(self, direction_aware: bool = False):
r"""
Estimate an appropriate timestep for the advection-diffusion solver.
This is an implicit solver so the returned :math:`\delta t` is the
minimum of:
- :math:`\delta t_{\textrm{diff}}`: typical time for diffusion across an element
- :math:`\delta t_{\textrm{adv}}`: typical element-crossing time for a fluid parcel
Parameters
----------
direction_aware : bool, default False
If True, the advective dt uses the per-cell extent
*along the local velocity direction* — `h_eff_c =
max_i(s_i) - min_i(s_i)` where `s_i = (x_i -
centroid) · v̂` over the cell vertices. This is the
distance material actually traverses through the cell
per unit |v|, and is **always ≥ the isotropic
mesh._radii estimate**, by 1.5–3× for equant cells
(geometric factor) and up to ~10× for cells that the
mover has stretched along the flow direction. On
adapted meshes the gain is substantial; on uniform
meshes it's the geometric factor only. Off by
default to preserve historical behaviour; safe to
enable everywhere once validated.
Returns
-------
pint.Quantity or float
The recommended timestep with physical time units if a model
with reference scales is available, otherwise nondimensional.
"""
### required modules
from mpi4py import MPI
# Use the unified .K property from the constitutive model
# This provides diffusivity for diffusion models
K = self.constitutive_model.K
# Evaluate the diffusivity (handles constant and spatially-varying cases)
if isinstance(K, sympy.Expr) or hasattr(K, 'sym'):
K_sym = K.sym if hasattr(K, 'sym') else K
if uw.function.fn_is_constant_expr(K_sym):
diffusivity = uw.function.evaluate(
K_sym,
np.zeros((1, self.mesh.dim)),
)
else:
diffusivity = uw.function.evaluate(
sympy.sympify(K_sym),
self.mesh._centroids,
self.mesh.N,
)
diffusivity = diffusivity.max()
else:
diffusivity = K
# If diffusivity is unit-aware (UnitAwareArray), nondimensionalise it to get
# consistent nondimensional values that match mesh._radii
# Note: .magnitude returns physical units, which would be wrong here
if hasattr(diffusivity, "units") and diffusivity.units is not None:
diffusivity = uw.non_dimensionalise(diffusivity)
elif hasattr(diffusivity, "magnitude"):
# Plain UWQuantity without units context - use magnitude
diffusivity = diffusivity.magnitude
# Ensure diffusivity is a plain numpy scalar
max_diffusivity = float(np.asarray(diffusivity).max())
## get global max diffusivity value
comm = uw.mpi.comm
diffusivity_glob = comm.allreduce(max_diffusivity, op=MPI.MAX)
### get the velocity values at element centroids (nondimensional)
vel = uw.function.evaluate(
self.V_fn,
self.mesh._centroids,
)
# If vel is unit-aware (UnitAwareArray), nondimensionalise it to get
# consistent nondimensional values that match mesh._radii
# Note: .magnitude returns physical units, which would be wrong here
if hasattr(vel, "units") and vel.units is not None:
vel = uw.non_dimensionalise(vel)
elif hasattr(vel, "magnitude"):
# Plain UWQuantity without units context - use magnitude
vel = vel.magnitude
# Ensure vel is a plain numpy array
vel = np.asarray(vel)
# Squeeze out any singleton dimensions from evaluate (shape: N,1,dim -> N,dim)
vel = np.squeeze(vel)
# Handle edge case of single element (ensure 2D)
if vel.ndim == 1:
vel = vel.reshape(1, -1)
# Get per-element velocity magnitudes
vel_magnitudes = np.linalg.norm(vel, axis=1)
# Get per-element radii (characteristic element size)
element_radii = self.mesh._radii
## estimate dt of adv and diff components using per-element approach
## dt_adv_i = h_i / |v_i| for advection
## dt_diff_i = h_i^2 / κ for diffusion (using global κ for now)
# Per-element diffusive timestep (all elements use same diffusivity)
if diffusivity_glob > 0:
dt_diff_per_element = (element_radii ** 2) / diffusivity_glob
min_dt_diff_local = np.min(dt_diff_per_element) if len(dt_diff_per_element) > 0 else np.inf
else:
min_dt_diff_local = np.inf
# Per-element advective timestep — either isotropic
# (mesh._radii / |v|) or direction-aware (v-aligned cell
# extent / |v|).
if direction_aware:
# Per-cell vertex indices (triangle / tet).
from underworld3.meshing.smoothing import _tri_cells
tris = _tri_cells(self.mesh.dm)
if tris is None:
# Fall back to isotropic for non-triangle meshes.
h_per_element = element_radii
else:
coords = np.asarray(self.mesh.X.coords)
centroids = coords[tris].mean(axis=1)
# v-hat per cell (use centroid v we already have)
vhat = np.where(
vel_magnitudes[:, None] > 0,
vel / np.maximum(vel_magnitudes[:, None],
1.0e-30),
0.0)
D = coords[tris] - centroids[:, None, :]
# Signed projections along v̂ per cell vertex
s = np.einsum('cvd,cd->cv', D, vhat)
h_per_element = s.max(axis=1) - s.min(axis=1)
# Sanity-floor — for zero-velocity cells s=0
# ⇒ h_eff=0 ⇒ dt_adv=inf via the where below
h_per_element = np.maximum(
h_per_element, 0.0)
else:
h_per_element = element_radii
with np.errstate(divide='ignore', invalid='ignore'):
dt_adv_per_element = np.where(
vel_magnitudes > 0,
h_per_element / vel_magnitudes,
np.inf
)
min_dt_adv_local = np.min(dt_adv_per_element) if len(dt_adv_per_element) > 0 else np.inf
# Get global minimum timesteps (parallel-safe)
min_dt_diff_glob = comm.allreduce(min_dt_diff_local, op=MPI.MIN)
min_dt_adv_glob = comm.allreduce(min_dt_adv_local, op=MPI.MIN)
# Store for user inspection
self.dt_adv = min_dt_adv_glob if not np.isinf(min_dt_adv_glob) else 0.0
self.dt_diff = min_dt_diff_glob if not np.isinf(min_dt_diff_glob) else 0.0
# Take overall minimum (respecting infinity for zero velocity/diffusivity cases)
dt_estimate = min(min_dt_diff_glob, min_dt_adv_glob)
# If both are infinite (no velocity and no diffusivity), return infinity
if np.isinf(dt_estimate):
return np.inf
# Dimensionalise the result to physical time
try:
return uw.dimensionalise(np.squeeze(dt_estimate), {'[time]': 1})
except Exception:
# Fallback: return plain nondimensional number
return np.squeeze(dt_estimate)
[docs]
@timing.routine_timer_decorator
def solve(
self,
zero_init_guess: bool = True,
timestep: float = None,
_force_setup: bool = False,
_evalf=False,
verbose=False,
divergence_retries: int = 0,
):
"""
Generates solution to constructed system.
Params
------
zero_init_guess:
If `True`, a zero initial guess will be used for the
system solution. Otherwise, the current values of `self.u` will be used.
divergence_retries:
If SNES reports DIVERGED, retry with warm start up to this
many times. 0 preserves legacy behaviour.
"""
if timestep is not None and timestep != self.delta_t:
self.delta_t = timestep # this will force an initialisation because the functions need to be updated
if _force_setup:
self._needs_function_rewire = True
if not self.constitutive_model._solver_is_setup:
self._needs_function_rewire = True
self.DFDt.psi_fn = self.constitutive_model.flux.T
if not self.is_setup:
self._setup_pointwise_functions(verbose)
self._setup_discretisation(verbose)
self._setup_solver(verbose)
# Update History / Flux History terms
# SemiLagrange and Lagrange may have different sequencing.
self.DuDt.update_pre_solve(timestep, verbose=verbose, evalf=_evalf)
self.DFDt.update_pre_solve(timestep, verbose=verbose, evalf=_evalf)
super().solve(zero_init_guess, _force_setup,
divergence_retries=divergence_retries)
# Invalidate cached data views - PETSc may have replaced underlying buffers
# This ensures .data and .array properties return fresh data from PETSc
# Handle both EnhancedMeshVariable (has _base_var) and direct _MeshVariable
target_var = getattr(self.u, "_base_var", self.u)
if hasattr(target_var, "_canonical_data"):
target_var._canonical_data = None
self.DuDt.update_post_solve(timestep, verbose=verbose, evalf=_evalf)
self.DFDt.update_post_solve(timestep, verbose=verbose, evalf=_evalf)
self.is_setup = True
self.constitutive_model._solver_is_setup = True
return
[docs]
class SNES_Diffusion(SNES_Scalar):
r"""
Diffusion equation solver using mesh-based finite elements.
Solves the scalar diffusion equation:
.. math::
\underbrace{\frac{\partial u}{\partial t}}_{\dot{f}}
- \nabla \cdot \underbrace{\left[ \boldsymbol{\kappa} \nabla u
\right]}_{\mathbf{F}} = \underbrace{f}_{h}
The flux term :math:`\mathbf{F}` relates diffusive fluxes to gradients
in the unknown :math:`u`.
Parameters
----------
mesh : Mesh
The computational mesh.
u_Field : MeshVariable
Mesh variable for the diffusing scalar.
order : int, default=1
Time integration order.
theta : float, default=0.0
Time integration parameter (0=explicit, 0.5=Crank-Nicolson, 1=implicit).
evalf : bool, default=False
Numerically evaluate symbolic expressions during setup.
verbose : bool, default=False
Enable verbose output.
DuDt : Eulerian_DDt, SemiLagrangian_DDt, or Lagrangian_DDt, optional
Time derivative operator for the unknown.
DFDt : Eulerian_DDt, SemiLagrangian_DDt, or Lagrangian_DDt, optional
Time derivative operator for the flux.
Notes
-----
- The diffusivity :math:`\kappa` is set via the ``constitutive_model`` property
- Sources :math:`f` can be any sympy expression involving mesh/swarm variables
See Also
--------
SNES_AdvectionDiffusion : Adds advection transport.
SNES_Poisson : Steady-state diffusion (no time derivative).
"""
def _object_viewer(self):
from IPython.display import Latex, Markdown, display
super()._object_viewer()
## feedback on this instance
display(Latex(r"$\quad\mathrm{u} = $ " + self.u.sym._repr_latex_()))
display(Latex(r"$\quad\Delta t = $ " + self.delta_t._repr_latex_()))
[docs]
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
u_Field: uw.discretisation.MeshVariable,
order: int = 1,
theta: float = 0.0,
evalf: Optional[bool] = False,
verbose=False,
DuDt: Union[Eulerian_DDt, SemiLagrangian_DDt, Lagrangian_DDt] = None,
DFDt: Union[Eulerian_DDt, SemiLagrangian_DDt, Lagrangian_DDt] = None,
):
## Parent class will set up default values etc
super().__init__(
mesh,
u_Field,
u_Field.degree,
verbose,
DuDt=DuDt,
DFDt=DFDt,
)
# default values for properties
self.f = sympy.Matrix.zeros(1, 1)
self._constitutive_model = None
self.theta = theta
# These are unique to the advection solver
self._delta_t = expression(R"\Delta t", 0, "Physically motivated timestep")
self._needs_function_rewire = True
### Setup the history terms ... This version should not build anything
### by default - it's the template / skeleton
## NB - Smoothing is generally required for stability. 0.0001 is effective
## at the various resolutions tested.
if DuDt is None:
self.Unknowns.DuDt = Eulerian_DDt(
self.mesh,
u_Field,
vtype=uw.VarType.SCALAR,
degree=u_Field.degree,
continuous=u_Field.continuous,
varsymbol=u_Field.symbol,
verbose=verbose,
evalf=evalf,
bcs=self.essential_bcs,
order=order,
smoothing=0.0,
)
else:
# validation
if order is None:
order = DuDt.order
else:
if DuDt.order < order:
raise RuntimeError(
f"DuDt supplied is order {DuDt.order} but order requested is {order}"
)
self.Unknowns.DuDt = DuDt
if DFDt is None:
self.Unknowns.DFDt = Symbolic_DDt(
sympy.Matrix([[0] * self.mesh.dim]),
varsymbol=rf"{{F[ {self.u.symbol} ] }}",
theta=theta,
bcs=None,
order=order,
)
### solution unable to solve after n timesteps, due to the projection of flux term (???)
# self.Unknowns.DFDt = Eulerian_DDt(
# self.mesh,
# sympy.Matrix(
# [[0] * self.mesh.dim]
# ), # Actual function is not defined at this point
# vtype=uw.VarType.VECTOR,
# degree=u_Field.degree,
# continuous=u_Field.continuous,
# varsymbol=rf"{{F[ {self.u.symbol} ] }}",
# theta=theta,
# evalf=evalf,
# verbose=verbose,
# bcs=None,
# order=order,
# smoothing=0.0,
# )
return
@property
def F0(self):
"""Pointwise source term including time derivative."""
f0 = expression(
r"f_0 \left( \mathbf{u} \right)",
-self.f + self.DuDt.bdf() / self.delta_t,
"Diffusion pointwise force term: f_0(u)",
)
# backward compatibility
self._f0 = f0
return f0
@property
def F1(self):
"""Pointwise diffusive flux term."""
F1_val = expression(
r"\mathbf{F}_1\left( \mathbf{u} \right)",
self.DFDt.adams_moulton_flux(),
"Diffusion pointwise flux term: F_1(u)",
)
# backward compatibility
self._f1 = F1_val
return F1_val
@property
def f(self):
"""Source term for the diffusion equation."""
return self._f
@f.setter
def f(self, value):
"""Set the volumetric source term."""
self._needs_function_rewire = True
self._f = sympy.Matrix((value,))
@property
def delta_t(self):
"""Timestep for time integration."""
return self._delta_t
@delta_t.setter
def delta_t(self, value):
"""Set the timestep (handles unit conversion if provided)."""
# Note: comparison must handle potential UWexpressions / UWQuantities
# Use .data or float() to get numeric values for stable comparison
try:
old_dt = float(self._delta_t.data)
new_dt = float(value.data) if hasattr(value, 'data') else float(value)
if np.isclose(old_dt, new_dt, rtol=1e-12, atol=1e-15):
return
except:
pass
self._needs_function_rewire = True
# Handle Pint Quantities with time dimensions
if hasattr(value, "dimensionality"):
# This is a Pint Quantity - check if it has time dimensions
try:
from ..scaling import units as ureg
time_dim = ureg.second.dimensionality
if value.dimensionality == time_dim:
# Convert physical time to nondimensional using model time scale
model = uw.get_default_model()
if model and hasattr(model, "fundamental_scales") and model.fundamental_scales:
time_scale = model.fundamental_scales.get("time")
if time_scale is not None:
# Physical time / time scale = nondimensional time
# Must use to_reduced_units() to convert both quantities
# to same base units before extracting magnitude.
# Otherwise Pint keeps different unit bases (megayear/second)
# and .magnitude returns the unconverted number!
result = value / time_scale
# Get the internal Pint quantity for proper unit conversion
if hasattr(result, "_pint_qty"):
# UWQuantity - access internal Pint quantity
pint_result = result._pint_qty.to_reduced_units()
elif hasattr(result, "to_reduced_units"):
# Raw Pint Quantity
pint_result = result.to_reduced_units()
else:
pint_result = result
# Extract the dimensionless magnitude
if hasattr(pint_result, "magnitude"):
value = float(pint_result.magnitude)
else:
value = float(pint_result)
except Exception:
pass # If anything fails, try to use value as-is
self._delta_t.sym = value
[docs]
@timing.routine_timer_decorator
def estimate_dt(self):
r"""
Estimate an appropriate timestep for the diffusion solver.
This solver only has a diffusive component, so the returned
:math:`\delta t` is:
- :math:`\delta t_{\textrm{diff}}`: typical time for diffusion across an element
Returns
-------
pint.Quantity or float
The diffusive timestep with physical time units if a model
with reference scales is available, otherwise nondimensional.
"""
### required modules
from mpi4py import MPI
# Use the unified .K property from the constitutive model
# This provides diffusivity for diffusion models
K = self.constitutive_model.K
# Evaluate the diffusivity (handles constant and spatially-varying cases)
if isinstance(K, sympy.Expr) or hasattr(K, 'sym'):
K_sym = K.sym if hasattr(K, 'sym') else K
if uw.function.fn_is_constant_expr(K_sym):
diffusivity = uw.function.evaluate(
K_sym,
np.zeros((1, self.mesh.dim)),
)
else:
diffusivity = uw.function.evaluate(
sympy.sympify(K_sym),
self.mesh._centroids,
self.mesh.N,
)
diffusivity = diffusivity.max()
else:
diffusivity = K
# If diffusivity is unit-aware (UnitAwareArray), nondimensionalise it to get
# consistent nondimensional values that match mesh._radii
# Note: .magnitude returns physical units, which would be wrong here
if hasattr(diffusivity, "units") and diffusivity.units is not None:
diffusivity = uw.non_dimensionalise(diffusivity)
elif hasattr(diffusivity, "magnitude"):
# Plain UWQuantity without units context - use magnitude
diffusivity = diffusivity.magnitude
# Ensure diffusivity is a plain numpy scalar
diffusivity = float(np.asarray(diffusivity).max())
## get global max diffusivity value
comm = uw.mpi.comm
diffusivity_glob = comm.allreduce(diffusivity, op=MPI.MAX)
## get mesh spacing (nondimensional, consistent with diffusivity)
min_dx = self.mesh.get_min_radius()
## estimate dt of diff component (all in nondimensional units)
self.dt_diff = 0.0
if diffusivity_glob != 0.0:
dt_diff = (min_dx**2) / diffusivity_glob
self.dt_diff = dt_diff
else:
dt_diff = np.inf
dt_estimate = dt_diff
# Apply unit-aware scaling if the solver has unit-aware variables
return _apply_unit_aware_scaling(np.squeeze(dt_estimate), self.u, self.mesh)
[docs]
@timing.routine_timer_decorator
def solve(
self,
zero_init_guess: bool = True,
timestep: float = None,
evalf: bool = False,
_force_setup: bool = False,
verbose=False,
divergence_retries: int = 0,
):
"""
Generates solution to constructed system.
Params
------
zero_init_guess:
If `True`, a zero initial guess will be used for the
system solution. Otherwise, the current values of `self.u` will be used.
divergence_retries:
If SNES reports DIVERGED, retry with warm start up to this
many times. 0 preserves legacy behaviour.
"""
if timestep is not None and timestep != self.delta_t:
self.delta_t = timestep # this will force an initialisation because the functions need to be updated
if _force_setup:
self._needs_function_rewire = True
if not self.constitutive_model._solver_is_setup:
self._needs_function_rewire = True
self.DFDt.psi_fn = self.constitutive_model.flux.T
# self._flux = self.constitutive_model.flux.T
# self._flux_star = self._flux.copy()
if not self.is_setup:
self._setup_pointwise_functions(verbose)
self._setup_discretisation(verbose)
self._setup_solver(verbose)
# Update History / Flux History terms
# SemiLagrange and Lagrange may have different sequencing.
self.DuDt.update_pre_solve(timestep, evalf=evalf, verbose=verbose)
self.DFDt.update_pre_solve(timestep, evalf=evalf, verbose=verbose)
super().solve(zero_init_guess, _force_setup,
divergence_retries=divergence_retries)
# Invalidate cached data views - PETSc may have replaced underlying buffers
target_var = getattr(self.u, "_base_var", self.u)
if hasattr(target_var, "_canonical_data"):
target_var._canonical_data = None
self.DuDt.update_post_solve(timestep, evalf=evalf, verbose=verbose)
self.DFDt.update_post_solve(timestep, evalf=evalf, verbose=verbose)
self.is_setup = True
self.constitutive_model._solver_is_setup = True
return
# This one is already updated to work with the Lagrange D_Dt
[docs]
class SNES_NavierStokes(SNES_Stokes_SaddlePt):
r"""
Navier-Stokes equation solver with momentum advection.
Provides a solver for the Navier-Stokes (vector advection-diffusion) equation
similar to the Semi-Lagrange Crank-Nicolson method (Spiegelman & Katz, 2006)
but using distributed upstream sampling from a swarm variable.
.. math::
\underbrace{\frac{\partial \mathbf{u}}{\partial t}
+ \left( \mathbf{u} \cdot \nabla \right) \mathbf{u}}_{\dot{\mathbf{u}}}
- \nabla \cdot \underbrace{\left[ \frac{\boldsymbol{\eta}}{2}
\left( \nabla \mathbf{u} + \nabla \mathbf{u}^T \right)
- p \mathbf{I} \right]}_{\mathbf{F}} = \underbrace{\mathbf{f}}_{\mathbf{h}}
The flux term :math:`\mathbf{F}` relates viscous stresses to velocity gradients.
Advective momentum transport is handled in the :math:`\dot{\mathbf{u}}` term.
The time derivative :math:`\dot{\mathbf{u}}` involves upstream sampling to find
:math:`\mathbf{u}^*`, representing velocity at the start of the timestep. This is
achieved using a swarm variable that carries history information along flow paths.
Parameters
----------
mesh : Mesh
The computational mesh.
velocityField : MeshVariable
Mesh variable for velocity.
pressureField : MeshVariable
Mesh variable for pressure.
rho : float or sympy.Expr
Fluid density.
order : int, default=1
Time integration order.
theta : float, default=0.5
Time integration parameter.
p_continuous : bool, default=True
If False, use discontinuous pressure elements.
verbose : bool, default=False
Enable verbose output.
DuDt : SemiLagrangian_DDt or Lagrangian_DDt, optional
Time derivative operator for velocity.
DFDt : SemiLagrangian_DDt or Lagrangian_DDt, optional
Time derivative operator for stress.
Notes
-----
- The viscosity :math:`\eta` is set via the ``constitutive_model`` property
- High-order shape functions (cubic or higher) are recommended for accurate
history term interpolation
- The user must supply and update the swarm variable representing :math:`\mathbf{u}^*`
References
----------
Spiegelman, M., & Katz, R. F. (2006). A semi-Lagrangian Crank-Nicolson
algorithm for the numerical solution of advection-diffusion problems.
*Geochemistry, Geophysics, Geosystems*, 7(4).
https://doi.org/10.1029/2005GC001073
See Also
--------
SNES_Stokes : Steady-state Stokes flow (no inertia).
SNES_AdvectionDiffusion : Scalar advection-diffusion.
"""
def _object_viewer(self):
from IPython.display import Latex, Markdown, display
super()._object_viewer()
## feedback on this instance
display(Latex(r"$\quad\mathrm{u} = $ " + self.u.sym._repr_latex_()))
display(Latex(r"$\quad\mathbf{p} = $ " + self.p.sym._repr_latex_()))
display(Latex(r"$\quad\Delta t = $ " + self.delta_t._repr_latex_()))
display(Latex(rf"$\quad\rho = $" + self.rho._repr_latex_()))
[docs]
@timing.routine_timer_decorator
def __init__(
self,
mesh: uw.discretisation.Mesh,
velocityField: uw.discretisation.MeshVariable,
pressureField: uw.discretisation.MeshVariable,
rho: Optional[float] = 0.0,
restore_points_func: Callable = None,
order: Optional[int] = 2,
flux_order: Optional[int] = None,
p_continuous: Optional[bool] = False,
verbose: Optional[bool] = False,
DuDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None,
DFDt: Union[SemiLagrangian_DDt, Lagrangian_DDt] = None,
):
## Parent class will set up default values and load u_Field into the solver
super().__init__(
mesh,
velocityField,
pressureField,
order,
p_continuous,
verbose,
DuDt=DuDt,
DFDt=DFDt,
)
# These are unique to the advection solver
self._delta_t = expression(r"\Delta t", sympy.oo, "Navier-Stokes timestep")
self._needs_function_rewire = True
self._rho = expression(R"{\uprho}", rho, "Density")
self._first_solve = True
self._order = order
self._flux_order = flux_order # None means follow effective_order
self._penalty = expression(R"{\uplambda}", 0, "Incompressibility Penalty")
self.restore_points_to_domain_func = restore_points_func
self._bodyforce = sympy.Matrix([[0] * self.mesh.dim]).T
self._constitutive_model = None
# self._E = self.mesh.vector.strain_tensor(self.u.sym)
self._Estar = None
self._constraints = sympy.Matrix((self.div_u,))
### sets up DuDt and DFDt
## ._setup_history_terms()
# If DuDt is not provided, then we can build a SLCN version
if self.Unknowns.DuDt is None:
self.Unknowns.DuDt = uw.systems.ddt.SemiLagrangian(
self.mesh,
self.u.sym, # Symbolic expression - SemiLagrangian evaluates this at each update
self.u.sym,
vtype=uw.VarType.VECTOR,
degree=self.u.degree,
continuous=self.u.continuous,
varsymbol=self.u.symbol,
verbose=self.verbose,
bcs=self.essential_bcs,
order=self._order,
smoothing=0.0001,
)
# F (at least for N-S) is a nodal point variable so there is no benefit
# to treating it as a swarm variable. We'll define and use our own SL tracker
# as we do in the SLCN version. We'll leave the option for an over-ride.
#
# Maybe u.degree-1. The scalar equivalent seems to show
# little benefit from specific choices here other than
# discontinuous flux variables amplifying instabilities.
self.Unknowns.DFDt = uw.systems.ddt.SemiLagrangian(
self.mesh,
sympy.Matrix.zeros(self.mesh.dim, self.mesh.dim),
self.u.sym,
vtype=uw.VarType.SYM_TENSOR,
degree=self.u.degree,
continuous=self.u.continuous,
varsymbol=rf"{{ F[ {self.u.symbol} ] }}",
verbose=self.verbose,
bcs=None,
order=self._order,
)
## Add in the history terms provided ...
return
@property
def F0(self):
"""Pointwise momentum source term (body force + inertia)."""
DuDt = self.Unknowns.DuDt
f0 = expression(
r"\mathbf{f}_0\left( \mathbf{u} \right)",
-self.bodyforce + self.rho * DuDt.bdf() / self.delta_t,
"NStokes pointwise force term: f_0(u)",
)
self._u_f0 = f0
return f0
@property
def F1(self):
"""Pointwise stress flux term (viscous + pressure)."""
dim = self.mesh.dim
DFDt = self.Unknowns.DFDt
if DFDt is not None:
# We can flag to only do this if the constitutive model has been updated
DFDt.psi_fn = self._constitutive_model.flux.T
F1 = expression(
r"\mathbf{F}_1\left( \mathbf{u} \right)",
DFDt.adams_moulton_flux()
- sympy.eye(self.mesh.dim) * (self.p.sym[0])
+ self.penalty * self.div_u * sympy.eye(dim),
"NStokes pointwise flux term: F_1(u)",
)
# Is the else condition useful - other than to prevent a crash ?
# Yes, because then it can just live on the Stokes solver ...
else:
F1 = expression(
r"\mathbf{F}_1\left( \mathbf{u} \right)",
self._constitutive_model.flux.T - sympy.eye(self.mesh.dim) * (self.p.sym[0]),
"NStokes pressure gradient term: F_1(u) - No Flux history provided",
)
self._u_f1 = F1
return F1
@property
def PF0(self):
"""Pointwise constraint term (continuity equation)."""
dim = self.mesh.dim
f0 = expression(
r"\mathbf{F}_1\left( \mathbf{p} \right)",
sympy.Matrix((self.constraints)),
"NStokes pointwise flux term: f_0(p)",
)
self._p_f0 = f0
return f0
## Deprecate this function
[docs]
def navier_stokes_problem_description(self):
"""Build residual terms for Navier-Stokes FEM assembly (deprecated)."""
# f0 residual term
self._u_f0 = self.F0.sym
# f1 residual term
self._u_f1 = self.F1.sym
# p1 residual term
self._p_f0 = self.PF0.sym
return
@property
def delta_t(self):
"""Timestep for time integration."""
return self._delta_t
@delta_t.setter
def delta_t(self, value):
"""Set the timestep value."""
self._needs_function_rewire = True
self._delta_t.sym = value
@property
def rho(self):
"""Fluid density."""
return self._rho
@rho.setter
def rho(self, value):
"""Set the fluid density."""
self._needs_function_rewire = True
self._rho.sym = value
@property
def f(self):
"""Source term for the momentum equation."""
return self._f
@f.setter
def f(self, value):
"""Set the volumetric source term."""
self._needs_function_rewire = True
self._f = sympy.Matrix((value,))
@property
def div_u(self):
r"""Velocity divergence: :math:`\nabla \cdot \mathbf{u} = \mathrm{tr}(\dot{\varepsilon})`."""
E = self.strainrate
divergence = E.trace()
return divergence
@property
def strainrate(self):
r"""Symmetric strain rate tensor (with 1/2 factor).
.. math::
\dot{\varepsilon}_{ij} = \frac{1}{2}\left(\frac{\partial u_i}{\partial x_j}
+ \frac{\partial u_j}{\partial x_i}\right)
"""
return sympy.Matrix(self.Unknowns.E)
@property
def DuDt(self):
"""Time derivative operator for velocity."""
return self.Unknowns.DuDt
@DuDt.setter
def DuDt(
self,
DuDt_value: Union[SemiLagrangian_DDt, Lagrangian_DDt],
):
"""Set the time derivative operator for velocity."""
self.Unknowns.DuDt = DuDt_value
self._solver_is_setup = False
@property
def DFDt(self):
"""Time derivative operator for stress flux."""
return self.Unknowns.DFDt
@property
def constraints(self):
"""Constraint equation (typically incompressibility)."""
return self._constraints
@constraints.setter
def constraints(self, constraints_matrix):
"""Set the constraint equation."""
self._is_setup = False
symval = sympify(constraints_matrix)
self._constraints = symval
@property
def bodyforce(self):
"""Body force vector (e.g., gravity)."""
return self._bodyforce
@bodyforce.setter
def bodyforce(self, value):
"""Set the body force vector."""
self._needs_function_rewire = True
self._bodyforce = self.mesh.vector.to_matrix(value)
@property
def saddle_preconditioner(self):
"""Preconditioner for the Schur complement."""
return self._saddle_preconditioner
@saddle_preconditioner.setter
def saddle_preconditioner(self, value):
"""Set the Schur complement preconditioner."""
self._needs_function_rewire = True
symval = sympify(value)
self._saddle_preconditioner = symval
@property
def penalty(self):
"""Augmented Lagrangian penalty parameter."""
return self._penalty
@penalty.setter
def penalty(self, value):
"""Set the augmented Lagrangian penalty parameter."""
self._needs_function_rewire = True
self._penalty.sym = value
[docs]
@timing.routine_timer_decorator
@memprobe.instrument("NavierStokes.solve")
def solve(
self,
zero_init_guess: bool = True,
timestep: float = None,
_force_setup: bool = False,
verbose=False,
_evalf=False,
order=None,
divergence_retries: int = 0,
):
"""
Generates solution to constructed system.
Params
------
zero_init_guess:
If `True`, a zero initial guess will be used for the
system solution. Otherwise, the current values of `self.u` will be used.
divergence_retries:
If SNES reports DIVERGED, retry with warm start up to this
many times. 0 preserves legacy behaviour.
"""
if order is None or order > self._order:
order = self._order
if timestep is not None and timestep != self.delta_t:
self.delta_t = timestep # this will force an initialisation because the functions need to be updated
if _force_setup:
self._needs_function_rewire = True
if not self.constitutive_model._solver_is_setup:
self._needs_function_rewire = True
self.DFDt.psi_fn = self.constitutive_model.flux.T
if not self.is_setup:
self._setup_pointwise_functions(verbose)
self._setup_discretisation(verbose)
self._setup_solver(verbose)
if uw.mpi.rank == 0 and verbose:
print(f"NS solver - pre-solve DuDt update", flush=True)
# Update SemiLagrange Flux terms
self.DuDt.update_pre_solve(timestep, verbose=verbose, evalf=_evalf)
self.DFDt.update_pre_solve(timestep, verbose=verbose, evalf=_evalf)
# Override AM coefficients if flux_order is explicitly set
if self._flux_order is not None:
from underworld3.systems.ddt import _update_am_values
fo = min(self._flux_order, self.DFDt.effective_order)
_update_am_values(self.DFDt._am_coeffs, fo, 0.5)
if uw.mpi.rank == 0 and verbose:
print(f"NS solver - solve Stokes flow", flush=True)
super().solve(
zero_init_guess,
_force_setup=_force_setup,
verbose=verbose,
picard=0,
divergence_retries=divergence_retries,
)
if uw.mpi.rank == 0 and verbose:
print(f"NS solver - post-solve DuDt update", flush=True)
self.DuDt.update_post_solve(timestep, verbose=verbose, evalf=_evalf)
self.DFDt.update_post_solve(timestep, verbose=verbose, evalf=_evalf)
self.is_setup = True
self.constitutive_model._solver_is_setup = True
return
[docs]
@timing.routine_timer_decorator
def estimate_dt(self):
r"""
Estimate an appropriate timestep for the Navier-Stokes solver.
This is an implicit solver, so the returned :math:`\delta t` should
be interpreted as:
- :math:`\delta t_{\textrm{diff}}`: typical time for vorticity diffusion across an element
- :math:`\delta t_{\textrm{adv}}`: typical element-crossing time for a fluid parcel
The Navier-Stokes equations include momentum diffusion via kinematic
viscosity :math:`\nu = \eta/\rho`, so the diffusive timestep is computed
from this quantity.
Returns
-------
tuple
(:math:`\delta t_{\textrm{diff}}`, :math:`\delta t_{\textrm{adv}}`)
"""
### required modules
from mpi4py import MPI
# For Navier-Stokes, diffusivity is the kinematic viscosity: ν = η/ρ
# Use the unified .K property from the constitutive model (returns viscosity)
K = self.constitutive_model.K
# Evaluate the viscosity (handles constant and spatially-varying cases)
if isinstance(K, sympy.Expr) or hasattr(K, 'sym'):
K_sym = K.sym if hasattr(K, 'sym') else K
if uw.function.fn_is_constant_expr(K_sym):
diffusivity = uw.function.evaluate(
K_sym,
np.zeros((1, self.mesh.dim)),
)
else:
diffusivity = uw.function.evaluate(
sympy.sympify(K_sym),
self.mesh._centroids,
self.mesh.N,
)
diffusivity = diffusivity.max()
else:
diffusivity = K
# If diffusivity is unit-aware (UnitAwareArray), nondimensionalise it to get
# consistent nondimensional values that match mesh._radii
# Note: .magnitude returns physical units, which would be wrong here
if hasattr(diffusivity, "units") and diffusivity.units is not None:
diffusivity = uw.non_dimensionalise(diffusivity)
elif hasattr(diffusivity, "magnitude"):
# Plain UWQuantity without units context - use magnitude
diffusivity = diffusivity.magnitude
# Ensure diffusivity is a plain numpy scalar
max_diffusivity = float(np.asarray(diffusivity).max())
## get global max diffusivity value
comm = uw.mpi.comm
diffusivity_glob = comm.allreduce(max_diffusivity, op=MPI.MAX)
### get the velocity values
vel = uw.function.evaluate(
self.u.sym,
self.mesh._centroids,
self.mesh.N,
)
# If vel is unit-aware (UnitAwareArray), nondimensionalise it to get
# consistent nondimensional values that match mesh._radii
# Note: .magnitude returns physical units, which would be wrong here
if hasattr(vel, "units") and vel.units is not None:
vel = uw.non_dimensionalise(vel)
elif hasattr(vel, "magnitude"):
# Plain UWQuantity without units context - use magnitude
vel = vel.magnitude
# Ensure vel is a plain numpy array
vel = np.asarray(vel)
### get global velocity from velocity field
max_magvel = np.linalg.norm(vel, axis=1).max()
max_magvel_glob = comm.allreduce(max_magvel, op=MPI.MAX)
## get radius
min_dx = self.mesh.get_min_radius()
## estimate dt of adv and diff components
dt_diff = np.inf
dt_adv = np.inf
if diffusivity_glob != 0.0:
dt_diff = (min_dx**2) / diffusivity_glob
if max_magvel_glob != 0.0:
dt_adv = min_dx / max_magvel_glob
return (
_apply_unit_aware_scaling(np.squeeze(dt_diff), self.u, self.mesh),
_apply_unit_aware_scaling(np.squeeze(dt_adv), self.u, self.mesh),
)