Source code for underworld3.swarm

r"""
Particle swarm management for Lagrangian tracking.

This module provides particle swarm (point cloud) data structures for
tracking material properties through deformation. Swarms enable Lagrangian
representations of material history, composition, and other quantities
that move with the flow.

**SwarmType** -- PETSc swarm type specification (BASIC or PIC).

**SwarmVariable** -- Variable storing values at particle locations with
mesh-based proxy for use in symbolic expressions.

**IndexSwarmVariable** -- Integer-valued swarm variable for material indexing.

The swarm module integrates with PETSc's DMSwarm for parallel particle
management and provides automatic population, advection, and repopulation
capabilities.

See Also
--------
underworld3.discretisation : Mesh discretisation classes.
underworld3.systems.ddt : Time derivative schemes using swarms.
"""
from posixpath import pardir
import petsc4py.PETSc as PETSc

import numpy as np
import sympy
import h5py
import os
import warnings
import weakref
from typing import Optional, Tuple

import underworld3 as uw
from underworld3.utilities._api_tools import Stateful
from underworld3.utilities._api_tools import uw_object
from underworld3.utilities.mathematical_mixin import MathematicalMixin

import underworld3.timing as timing

comm = uw.mpi.comm

from enum import Enum


# We can grab this type from the PETSc module
class SwarmType(Enum):
    """
    PETSc swarm type specification.

    Determines how particles are managed by PETSc's DMSwarm infrastructure.

    Attributes
    ----------
    DMSWARM_BASIC : int
        Basic point cloud without mesh association.
    DMSWARM_PIC : int
        Particle-in-cell mode with automatic mesh cell tracking.
        Particles are migrated between MPI ranks as they move across
        cell boundaries.
    """

    DMSWARM_BASIC = 0
    DMSWARM_PIC = 1


# We can grab this type from the PETSc module
# SwarmPICLayout has been moved to pic_swarm.py


# Note - much of the setup is necessarily the same as the MeshVariable
# and the duplication should be removed.

from underworld3.utilities.dimensionality_mixin import DimensionalityMixin


[docs] class SwarmVariable(DimensionalityMixin, MathematicalMixin, Stateful, uw_object): r""" Variable supported by a particle swarm (point cloud). A SwarmVariable stores values at discrete particle locations and provides a mesh-based proxy representation for use in symbolic expressions. This enables Lagrangian tracking of material properties through deformation. Parameters ---------- name : str Identifier for this variable (must be unique within the swarm). swarm : Swarm The supporting particle swarm. size : int or tuple, optional Shape specification: int for vectors, tuple for matrices. If None, inferred from ``vtype``. vtype : VarType, optional Variable type (SCALAR, VECTOR, TENSOR, SYM_TENSOR, MATRIX). If None, inferred from ``size``. dtype : type, default=float Data type for storage (float or int). proxy_degree : int, default=1 Polynomial degree for the mesh proxy variable. proxy_continuous : bool, default=True Whether the proxy uses continuous (True) or discontinuous (False) interpolation. varsymbol : str, optional LaTeX symbol for display. Defaults to ``name``. rebuild_on_cycle : bool, default=True If True, rebuild the proxy when particles cycle through periodic boundaries. Recommended for continuous fields. units : str or pint.Unit, optional Physical units for this variable (e.g., 'kelvin', 'Pa'). Requires reference quantities to be set on the model. See Also -------- MeshVariable : Variable supported by mesh nodes. Swarm : Container for particle locations. Examples -------- Create a temperature field on a swarm: >>> swarm = uw.swarm.Swarm(mesh) >>> T = swarm.add_variable("T", size=1, vtype=uw.VarType.SCALAR) >>> T.data[:] = 1600.0 # Set initial temperature Create a velocity field: >>> v = swarm.add_variable("v", size=mesh.dim, vtype=uw.VarType.VECTOR) Notes ----- SwarmVariables are essential for tracking material properties that advect with the flow. The mesh proxy enables their use in finite element formulations while particle storage preserves Lagrangian history. """
[docs] @timing.routine_timer_decorator def __init__( self, name, swarm, size=None, # only needed if MATRIX type vtype=None, dtype=float, proxy_degree=1, proxy_continuous=True, _register=True, _proxy=True, _nn_proxy=False, varsymbol=None, rebuild_on_cycle=True, units=None, units_backend=None, ): if name in swarm.vars.keys(): raise ValueError("Variable with name {} already exists on swarm.".format(name)) import re import sympy import math if varsymbol is None: varsymbol = name self.name = name self.clean_name = re.sub(r"[^a-zA-Z0-9_]", "", name) self.symbol = varsymbol self._swarm_ref = weakref.ref(swarm) self.shape = size mesh = swarm.mesh if vtype == None: if isinstance(size, int) and size == 1: vtype = uw.VarType.SCALAR elif isinstance(size, int) and size == mesh.dim: vtype = uw.VarType.VECTOR elif isinstance(size, tuple): if size[0] == mesh.dim and size[1] == mesh.dim: vtype = uw.VarType.TENSOR else: vtype = uw.VarType.MATRIX else: raise ValueError( "Unable to infer variable type from `num_components`. Please explicitly set the `vtype` parameter." ) self.vtype = vtype # Store unit metadata for variable # Convert string units to Pint Unit objects for consistency with MeshVariable if units is not None: if isinstance(units, str): # Parse string units to Pint Unit object # uw.units('K') returns a Quantity (1 kelvin), so we extract .units to get the Unit self._units = uw.units(units).units elif hasattr(units, "dimensionality"): # Already a pint.Unit object self._units = units else: # Fallback: store as-is (shouldn't happen) self._units = units # units_backend parameter is deprecated - Pint is the only supported backend if units_backend is not None and units_backend != "pint": raise ValueError( f"Unknown units backend: {units_backend}. Only 'pint' is supported." ) else: self._units = None # STRICT UNITS MODE CHECK # Enforce units-scales contract: variables with units require reference quantities if units is not None: model = uw.get_default_model() # Check if strict mode is enabled if uw.is_strict_units_active() and not model.has_units(): raise ValueError( f"Strict units mode: Cannot create swarm variable '{name}' with units='{units}' " f"when model has no reference quantities.\n\n" f"Options:\n" f" 1. Set reference quantities FIRST:\n" f" model = uw.get_default_model()\n" f" model.set_reference_quantities(\n" f" domain_depth=uw.quantity(1000, 'km'),\n" f" plate_velocity=uw.quantity(5, 'cm/year')\n" f" )\n\n" f" 2. Remove units parameter (use plain numbers):\n" f" swarm.add_variable('{name}', ...) # No units\n\n" f" 3. Disable strict mode (not recommended):\n" f" uw.use_strict_units(False)\n" ) # If not strict mode and no reference quantities, warn as before if not model.has_units(): warnings.warn( f"\nSwarm variable '{name}' has units '{units}' but no reference quantities are set.\n" f"Call model.set_reference_quantities() before creating variables with units.\n" f"Variable will use scaling_coefficient=1.0, which may lead to poor numerical conditioning.\n" f"Consider enabling strict mode: uw.use_strict_units(True)", UserWarning ) if not isinstance(vtype, uw.VarType): raise ValueError( "'vtype' must be an instance of 'Variable_Type', for example `underworld.VarType.SCALAR`." ) if vtype == uw.VarType.SCALAR: self.num_components = 1 self.shape = (1, 1) self.cpt_map = 0 elif vtype == uw.VarType.VECTOR: self.num_components = mesh.dim self.shape = (1, mesh.dim) self.cpt_map = tuple(range(0, mesh.dim)) elif vtype == uw.VarType.TENSOR: self.num_components = mesh.dim * mesh.dim self.shape = (mesh.dim, mesh.dim) elif vtype == uw.VarType.SYM_TENSOR: self.num_components = math.comb(mesh.dim + 1, 2) self.shape = (mesh.dim, mesh.dim) elif vtype == uw.VarType.MATRIX: self.num_components = self.shape[0] * self.shape[1] self._data_container = np.empty(self.shape, dtype=object) if (dtype == float) or (dtype == "float") or (dtype == np.float64): self.dtype = float petsc_type = PETSc.ScalarType elif (dtype == int) or (dtype == "int") or (dtype == np.int32) or (dtype == np.int64): self.dtype = int petsc_type = PETSc.IntType else: raise TypeError( f"Provided dtype={dtype} is not supported. Supported types are 'int' and 'float'." ) if _register: # Check if swarm is already populated - PETSc doesn't allow registering # new fields after DMSwarmFinalizeFieldRegister() has been called if self.swarm.local_size > 0: raise RuntimeError( f"Cannot add variable '{name}' to swarm: swarm is already populated " f"with {self.swarm.local_size} particles. Variables must be created " f"before calling swarm.populate() or any other operation that adds particles.\n" f"\nCorrect usage:\n" f" swarm = uw.swarm.Swarm(mesh)\n" f" variable = swarm.add_variable('{name}', {size}) # Create variables first\n" f" swarm.populate(fill_param=3) # Then populate with particles" ) self.swarm.dm.registerField(self.clean_name, self.num_components, dtype=petsc_type) self._data = None self._cached_data = None # add to swarms dict self.swarm._vars[self.clean_name] = self self._is_accessed = False # Initialize proxy flags first before creating proxy variable self._updating_proxy = False # Flag to prevent recursive proxy updates self._proxy_stale = True # Flag to track if proxy needs updating (lazy evaluation) # proxy variable self._proxy = _proxy self._vtype = vtype self._proxy_degree = proxy_degree self._proxy_continuous = proxy_continuous self._nn_proxy = _nn_proxy self._create_proxy_variable() # recycle swarm self._rebuild_on_cycle = rebuild_on_cycle self._register = _register from collections import namedtuple SwarmVariable_ij = namedtuple("SwarmVariable_ij", ["data", "sym"]) if self._proxy: for i in range(0, self.shape[0]): for j in range(0, self.shape[1]): self._data_container[i, j] = SwarmVariable_ij( data=f"SwarmVariable[...].data is only available within mesh.access() context", sym=self.sym[i, j], ) # Initialize DimensionalityMixin DimensionalityMixin.__init__(self) super().__init__() # Array interface now unified using NDArray_With_Callback (no legacy/enhanced split) self._array_cache = None self._flat_data_cache = None # Register with default model for orchestration uw.get_default_model()._register_variable(self.name, self) return
@property def units(self): """Return the units associated with this variable.""" return self._units @units.setter def units(self, value): """Set the units for this variable.""" # Convert string units to Pint Unit objects for consistency if value is not None and isinstance(value, str): self._units = uw.units(value).units else: self._units = value @property def has_units(self): """Check if this variable has units.""" return self._units is not None def _create_variable_array(self, initial_data=None): """ Factory function to create NDArray_With_Callback for variable data. Follows the same pattern as swarm.points implementation. Parameters ---------- initial_data : numpy.ndarray, optional Initial data for the array. If None, fetches current data from PETSc. Returns ------- NDArray_With_Callback Array object with callback for automatic PETSc synchronization """ if initial_data is None: initial_data = self.unpack_uw_data_from_petsc(squeeze=False, sync=True) # Create NDArray_With_Callback (following swarm._points pattern) array_obj = uw.utilities.NDArray_With_Callback( initial_data, owner=self, disable_inplace_operators=False, # Allow operations like existing arrays ) # Single callback function (following swarm_update_callback pattern) def variable_update_callback(array, change_context): """Callback to sync variable changes back to PETSc (like swarm.points)""" var = array.owner if var is None: # This guard handles cases where the array is accessed during # object teardown (e.g. at application exit or mesh rebuilds), # where the owning Python variable has already been garbage # collected but the NDArray proxy still exists. return # Only act on data-changing operations (following swarm.points pattern) data_changed = change_context.get("data_has_changed", True) if not data_changed: return # Skip updates during coordinate changes to prevent corruption if hasattr(var.swarm, "_migration_disabled") and var.swarm._migration_disabled: return # Persist changes to PETSc (like swarm callback updates coordinates) var.pack_uw_data_to_petsc(array, sync=True) # Register the callback (following swarm.points pattern) array_obj.add_callback(variable_update_callback) return array_obj def _create_canonical_data_array(self, initial_data=None): """ Create the single canonical data array with PETSc synchronization. This is the ONLY method that creates arrays with PETSc callbacks. Returns data in shape (-1, num_components) using pack_raw/unpack_raw methods. Parameters ---------- initial_data : numpy.ndarray, optional Initial data for the array. If None, fetches current data from PETSc. Returns ------- NDArray_With_Callback Canonical array object with callback for automatic PETSc synchronization """ if initial_data is None: # Use unpack_raw to get flat format (-1, num_components) initial_data = self.unpack_raw_data_from_petsc(squeeze=False, sync=True) # Handle case where unpack returns None (swarm not initialized) if initial_data is None: initial_data = np.zeros((0, self.num_components)) # Create NDArray_With_Callback for flat data array_obj = uw.utilities.NDArray_With_Callback( initial_data, owner=self, disable_inplace_operators=False, # Allow operations like existing arrays ) # Single canonical callback for PETSc synchronization def canonical_data_callback(array, change_context): """ONLY callback that handles PETSc synchronization - prevents conflicts""" # Only act on data-changing operations data_changed = change_context.get("data_has_changed", True) if not data_changed: return # Skip updates during migration to prevent corruption if hasattr(self.swarm, "_migration_disabled") and self.swarm._migration_disabled: return # Check for None array to prevent copy errors if array is None: return # STEP 1: Ensure array has correct canonical shape before PETSc sync # The callback might receive wrong-shaped arrays from array view operations import numpy as np canonical_array = np.atleast_2d(array) if canonical_array.shape != (array.shape[0], self.num_components): # Reshape to canonical format: (-1, num_components) canonical_array = canonical_array.reshape(-1, self.num_components) # STEP 1: Sync to PETSc using established method with correct shape self.pack_raw_data_to_petsc(canonical_array, sync=True) # STEP 2: Handle variable-specific updates (like IndexSwarmVariable proxy marking) if hasattr(self, "_on_data_changed"): self._on_data_changed() # Register the single canonical callback array_obj.add_callback(canonical_data_callback) return array_obj def _create_array_view(self): """ Create array view of canonical data using appropriate conversion strategy. Strategy depends on variable complexity: - Scalars/Vectors: Simple reshape operations - 2D+ Tensors: Complex pack/unpack operations Returns ------- ArrayView Array-like object that delegates changes back to canonical data """ if self._is_simple_variable(): return self._create_simple_array_view() else: return self._create_tensor_array_view() def _is_simple_variable(self): """Check if this is a simple scalar/vector variable (not a complex tensor)""" return len(self.shape) <= 1 or (len(self.shape) == 2 and self.shape[1] == 1) def _create_simple_array_view(self): """Array view for scalars/vectors using simple reshape operations""" import numpy as np class SimpleSwarmArrayView: def __init__(self, parent_var): self.parent = parent_var def _get_array_data(self): # Simple reshape: (-1, num_components) -> (N, a, b) data = self.parent.data # For simple variables, reshape to (N, a, b) format reshaped = data.reshape(data.shape[0], *self.parent.shape) # Apply dimensionalization if needed import underworld3 as uw from .utilities.unit_aware_array import UnitAwareArray # Check if variable has units and model has reference quantities model = uw.get_default_model() has_units = hasattr(self.parent, "units") and self.parent.units is not None if has_units and model.has_units(): # Variable has units - wrap with UnitAwareArray from .scaling import units as ureg var_units = self.parent.units if isinstance(var_units, str): var_units = ureg(var_units) # If ND scaling is active, data is non-dimensional and needs dimensionalization if uw.is_nondimensional_scaling_active(): # Get dimensionality pint_qty = 1.0 * var_units dimensionality = dict(pint_qty.dimensionality) # Dimensionalize: ND → dimensional using model reference scales # This returns a UnitAwareArray with SI base units dimensional_values = uw.dimensionalise(reshaped, target_dimensionality=dimensionality) # Convert from SI base units to variable's units (e.g., m/s → cm/yr) return dimensional_values.to(var_units) else: # ND scaling not active - data is already dimensional return UnitAwareArray(reshaped, units=var_units) else: # No units - return plain array return reshaped def __getitem__(self, key): return self._get_array_data()[key] def __setitem__(self, key, value): import underworld3 as uw # PRINCIPLE (2025-11-27): When units are active and variable has units, # we REQUIRE unit-aware input to avoid ambiguity. Plain arrays are ambiguous: # are they dimensional or non-dimensional? We don't guess. # # - Use .array for unit-aware assignment (requires UnitAwareArray) # - Use .data for non-dimensional assignment (plain arrays OK) has_unit_info = hasattr(value, 'magnitude') or hasattr(value, 'value') model = uw.get_default_model() var_has_units = hasattr(self.parent, 'units') and self.parent.units is not None units_active = model.has_units() and uw.is_nondimensional_scaling_active() if not has_unit_info and var_has_units and units_active: # Plain array assigned to unit-aware variable with scaling active # This is ambiguous - reject with helpful error var_units = self.parent.units raise ValueError( f"Cannot assign plain array to '{self.parent.name}.array' when units are active.\n" f"\n" f"The variable '{self.parent.name}' has units '{var_units}', but the assigned\n" f"value has no unit information. This is ambiguous: should the values be\n" f"interpreted as dimensional (in {var_units}) or non-dimensional?\n" f"\n" f"Solutions:\n" f" 1. Wrap with units: UnitAwareArray(data, units='{var_units}')\n" f" 2. Use uw.function.evaluate() which returns unit-aware arrays\n" f" 3. For non-dimensional values, use: {self.parent.name}.data[...] = value\n" ) # Get current NON-DIMENSIONAL array data # Note: We use data directly here, not _get_array_data() which dimensionalizes raw_data = self.parent.data array_data = raw_data.reshape(raw_data.shape[0], *self.parent.shape) # Create a copy to modify (avoid modifying view directly) modified_data = array_data.copy() if has_unit_info: # Value has units - need full conversion pipeline if model.has_units() and var_has_units: from .scaling import units as ureg # Step 1: Convert to variable's units target_units_str = self.parent.units if isinstance(self.parent.units, str) else str(self.parent.units) converted = value.to(target_units_str) # Extract numerical value if hasattr(converted, 'value'): dimensional_value = converted.value elif hasattr(converted, 'magnitude'): dimensional_value = converted.magnitude else: dimensional_value = float(converted) # Step 2: Non-dimensionalize if scaling active if uw.is_nondimensional_scaling_active(): target_units = ureg(target_units_str) temp_qty = uw.quantity(dimensional_value, target_units) nd_value = uw.non_dimensionalise(temp_qty) if hasattr(nd_value, 'value'): value = nd_value.value elif hasattr(nd_value, 'magnitude'): value = nd_value.magnitude else: value = nd_value else: value = dimensional_value else: # No units mode - just extract value/magnitude if hasattr(value, 'value'): value = value.value elif hasattr(value, 'magnitude'): value = value.magnitude else: value = float(value) # Update the specific elements modified_data[key] = value # Reshape back to canonical data format: ensure exact shape match reshaped_data = modified_data.reshape(-1, self.parent.num_components) self.parent.data[:] = reshaped_data # Forward common array methods def max(self): return self._get_array_data().max() def min(self): return self._get_array_data().min() def mean(self): """ Compute mean value of swarm particles. ⚠️ WARNING: This computes a simple arithmetic mean of the particle values. Since swarm particles are typically non-uniformly distributed in space, this is an APPROXIMATION of the spatial mean. For accurate spatial statistics, consider using integration via swarm proxy variables or computing mesh integrals of the proxy field. Returns ------- float or tuple Mean value (float for scalar variables, tuple for multi-component) """ return self._get_array_data().mean() def sum(self): return self._get_array_data().sum() def std(self): """ Compute standard deviation of swarm particles. ⚠️ WARNING: This computes a simple numpy std of the particle values. Since swarm particles are typically non-uniformly distributed in space, this is an APPROXIMATION of the spatial standard deviation. For accurate spatial statistics, consider using integration via swarm proxy variables or computing mesh integrals of the proxy field. Returns ------- float or tuple Standard deviation (float for scalar variables, tuple for multi-component) """ return self._get_array_data().std() @property def shape(self): return self._get_array_data().shape @property def dtype(self): return self._get_array_data().dtype def __array__(self): """Support for numpy functions like np.allclose(), np.isfinite(), etc.""" return self._get_array_data() def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): """Support for numpy universal functions""" # Convert all SimpleSwarmArrayView inputs to arrays converted_inputs = [] for input in inputs: if hasattr(input, "_get_array_data"): # Duck typing for array views converted_inputs.append(input._get_array_data()) else: converted_inputs.append(input) # Apply the ufunc to the converted inputs return ufunc(*converted_inputs, **kwargs) def delay_callback(self, description="array operation"): """Delegate to parent's canonical data delay_callback method""" return self.parent.data.delay_callback(description) return SimpleSwarmArrayView(self) def _create_tensor_array_view(self): """Array view for complex tensors using pack/unpack operations""" import numpy as np class TensorSwarmArrayView: def __init__(self, parent_var): self.parent = parent_var def _get_array_data(self): # Use complex pack/unpack for tensor layouts unpacked = self.parent.unpack_uw_data_from_petsc(squeeze=False) # Apply dimensionalization if needed import underworld3 as uw from .utilities.unit_aware_array import UnitAwareArray # Check if variable has units and model has reference quantities model = uw.get_default_model() has_units = hasattr(self.parent, "units") and self.parent.units is not None if has_units and model.has_units(): # Variable has units - wrap with UnitAwareArray from .scaling import units as ureg var_units = self.parent.units if isinstance(var_units, str): var_units = ureg(var_units) # If ND scaling is active, data is non-dimensional and needs dimensionalization if uw.is_nondimensional_scaling_active(): # Get dimensionality pint_qty = 1.0 * var_units dimensionality = dict(pint_qty.dimensionality) # Dimensionalize: ND → dimensional using model reference scales # This returns a UnitAwareArray with SI base units dimensional_values = uw.dimensionalise(unpacked, target_dimensionality=dimensionality) # Convert from SI base units to variable's units (e.g., m/s → cm/yr) return dimensional_values.to(var_units) else: # ND scaling not active - data is already dimensional return UnitAwareArray(unpacked, units=var_units) else: # No units - return plain array return unpacked def __getitem__(self, key): return self._get_array_data()[key] def __setitem__(self, key, value): import underworld3 as uw # PRINCIPLE (2025-11-27): When units are active and variable has units, # we REQUIRE unit-aware input to avoid ambiguity. Plain arrays are ambiguous: # are they dimensional or non-dimensional? We don't guess. # # - Use .array for unit-aware assignment (requires UnitAwareArray) # - Use .data for non-dimensional assignment (plain arrays OK) has_unit_info = hasattr(value, 'magnitude') or hasattr(value, 'value') model = uw.get_default_model() var_has_units = hasattr(self.parent, 'units') and self.parent.units is not None units_active = model.has_units() and uw.is_nondimensional_scaling_active() if not has_unit_info and var_has_units and units_active: # Plain array assigned to unit-aware variable with scaling active # This is ambiguous - reject with helpful error var_units = self.parent.units raise ValueError( f"Cannot assign plain array to '{self.parent.name}.array' when units are active.\n" f"\n" f"The variable '{self.parent.name}' has units '{var_units}', but the assigned\n" f"value has no unit information. This is ambiguous: should the values be\n" f"interpreted as dimensional (in {var_units}) or non-dimensional?\n" f"\n" f"Solutions:\n" f" 1. Wrap with units: UnitAwareArray(data, units='{var_units}')\n" f" 2. Use uw.function.evaluate() which returns unit-aware arrays\n" f" 3. For non-dimensional values, use: {self.parent.name}.data[...] = value\n" ) # Get current NON-DIMENSIONAL array data from PETSc # Note: We use unpack directly here, not _get_array_data() which dimensionalizes array_data = self.parent.unpack_uw_data_from_petsc(squeeze=False) # Create a copy to modify (avoid modifying view directly) modified_data = array_data.copy() if has_unit_info: # Value has units - need full conversion pipeline if model.has_units() and var_has_units: from .scaling import units as ureg # Step 1: Convert to variable's units target_units_str = self.parent.units if isinstance(self.parent.units, str) else str(self.parent.units) converted = value.to(target_units_str) # Extract numerical value if hasattr(converted, 'value'): dimensional_value = converted.value elif hasattr(converted, 'magnitude'): dimensional_value = converted.magnitude else: dimensional_value = float(converted) # Step 2: Non-dimensionalize if scaling active if uw.is_nondimensional_scaling_active(): target_units = ureg(target_units_str) temp_qty = uw.quantity(dimensional_value, target_units) nd_value = uw.non_dimensionalise(temp_qty) if hasattr(nd_value, 'value'): value = nd_value.value elif hasattr(nd_value, 'magnitude'): value = nd_value.magnitude else: value = nd_value else: value = dimensional_value else: # No units mode - just extract value/magnitude if hasattr(value, 'value'): value = value.value elif hasattr(value, 'magnitude'): value = value.magnitude else: value = float(value) # Update the specific elements modified_data[key] = value # Pack back to canonical data format packed_data = self.parent._pack_array_to_data_format(modified_data) self.parent.data[:] = packed_data # Forward common array methods def max(self): return self._get_array_data().max() def min(self): return self._get_array_data().min() def mean(self): """ Compute mean value of swarm particles. ⚠️ WARNING: This computes a simple arithmetic mean of the particle values. Since swarm particles are typically non-uniformly distributed in space, this is an APPROXIMATION of the spatial mean. For accurate spatial statistics, consider using integration via swarm proxy variables or computing mesh integrals of the proxy field. Returns ------- float or tuple Mean value (float for scalar variables, tuple for multi-component) """ return self._get_array_data().mean() def sum(self): return self._get_array_data().sum() def std(self): """ Compute standard deviation of swarm particles. ⚠️ WARNING: This computes a simple numpy std of the particle values. Since swarm particles are typically non-uniformly distributed in space, this is an APPROXIMATION of the spatial standard deviation. For accurate spatial statistics, consider using integration via swarm proxy variables or computing mesh integrals of the proxy field. Returns ------- float or tuple Standard deviation (float for scalar variables, tuple for multi-component) """ return self._get_array_data().std() @property def shape(self): return self._get_array_data().shape @property def dtype(self): return self._get_array_data().dtype def __array__(self): """Support for numpy functions like np.allclose(), np.isfinite(), etc.""" return self._get_array_data() def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): """Support for numpy universal functions""" # Convert all TensorSwarmArrayView inputs to arrays converted_inputs = [] for input in inputs: if hasattr(input, "_get_array_data"): # Duck typing for array views converted_inputs.append(input._get_array_data()) else: converted_inputs.append(input) # Apply the ufunc to the converted inputs return ufunc(*converted_inputs, **kwargs) def delay_callback(self, description="array operation"): """Delegate to parent's canonical data delay_callback method""" return self.parent.data.delay_callback(description) return TensorSwarmArrayView(self) def _pack_array_to_data_format(self, array_data): """Convert array format (N,a,b) back to canonical data format (N,components)""" # Use existing pack logic but return numpy array instead of writing to PETSc # This is a pure conversion method - no PETSc access return array_data.reshape(array_data.shape[0], -1) # Legacy methods preserved for backward compatibility (now do nothing)
[docs] def use_legacy_array(self): """Deprecated: Array interface is now unified using NDArray_With_Callback""" pass
[docs] def use_enhanced_array(self): """Deprecated: Array interface is now unified using NDArray_With_Callback""" pass
[docs] def sync_disabled(self, description="batch operation"): """ Context manager to disable automatic synchronization for batch operations. Now uses NDArray_With_Callback's delay_callback mechanism. Parameters ---------- description : str Description of the batch operation for debugging """ # Use NDArray_With_Callback's built-in delay mechanism return self.array.delay_callback(description)
## Should be a single master copy (mesh variable / swarm variable) def _data_layout(self, i, j=None): # mapping if self.vtype == uw.VarType.SCALAR: return 0 if self.vtype == uw.VarType.VECTOR: if i < 0 or j < 0: return self.swarm.dim else: if j is None: return i elif i == 0: return j else: raise IndexError( f"Vectors have shape {self.swarm.dim} or {(1, self.swarm.dim)} " ) if self.vtype == uw.VarType.TENSOR: if self.swarm.dim == 2: if i < 0 or j < 0: return 4 else: return ((0, 1), (2, 3))[i][j] else: if i < 0 or j < 0: return 9 else: return ((0, 1, 2), (3, 4, 5), (6, 7, 8))[i][j] if self.vtype == uw.VarType.SYM_TENSOR: if self.swarm.dim == 2: if i < 0 or j < 0: return 3 else: return ((0, 2), (2, 1))[i][j] else: if i < 0 or j < 0: return 6 else: return ((0, 3, 4), (3, 1, 5), (4, 5, 2))[i][j] if self.vtype == uw.VarType.MATRIX: if i < 0 or j < 0: return self.shape[0] * self.shape[1] else: return i + j * self.shape[0] def _create_proxy_variable(self): # release if defined old_meshVar = getattr(self, "_meshVar", None) self._meshVar = None if self._proxy: # REINIT policy: a swarm proxy is re-projected from the # *particles* on next access, so a mesh adapt should NOT # interpolate the old proxy values onto the new node # layout — that would freeze stale per-particle data on the # new mesh and miss particle migration. The helper marks the # var stale via _mark_reinit_stale; we wire that callback # below to set ``self._proxy_stale = True`` so the next # access re-projects. self._meshVar = uw.discretisation.MeshVariable( "proxy_" + self.clean_name, self.swarm.mesh, self.shape, self._vtype, degree=self._proxy_degree, continuous=self._proxy_continuous, varsymbol=r"\left<" + self.symbol + r"\right>", remesh_policy="reinit", ) # The remesh helper calls this on REINIT vars after an # adapt. Bound here so the closure captures ``self`` (the # SwarmVariable) rather than the proxy MeshVariable. self._meshVar._remesh_reinit_callback = ( lambda _self=self: setattr(_self, "_proxy_stale", True)) def _update(self): """ Mark proxy mesh variable as stale for lazy evaluation. The actual update happens when the proxy is accessed. """ # if not proxied, nothing to do. return. if not self._meshVar: return # Mark proxy as stale for lazy evaluation (avoids immediate PETSc access conflicts) self._proxy_stale = True return def _update_proxy_if_stale(self): """ Actually update the proxy mesh variable if it's marked as stale. This implements lazy evaluation to avoid PETSc access conflicts. """ # if not proxied, nothing to do. return. if not self._meshVar: return # Only update if stale and not already updating if not self._proxy_stale or self._updating_proxy: return try: self._updating_proxy = True self._rbf_to_meshVar(self._meshVar) self._proxy_stale = False # Mark as fresh finally: self._updating_proxy = False return # Maybe rbf_interpolate for this one and meshVar is a special case def _rbf_to_meshVar(self, meshVar, nnn=None, verbose=False): """ Here is how it works: for each particle, create a distance-weighted average on the node data Todo: caching the k-d trees etc for the proxy-mesh-variable nodal points Todo: some form of global fall-back for when there are no particles on a processor """ # Mapping to the coordinates of the variable from the # particle coords if nnn is None: nnn = self.swarm.mesh.dim + 1 if meshVar.mesh != self.swarm.mesh: # If this is our own proxy variable and mesh has changed, recreate it if hasattr(self, "_meshVar") and meshVar is self._meshVar: self._create_proxy_variable() # Use the newly created proxy variable meshVar = self._meshVar else: raise RuntimeError("Cannot map a swarm to a different mesh") new_coords = meshVar.coords Values = self.rbf_interpolate(new_coords, verbose=verbose, nnn=nnn) meshVar.data[...] = Values[...] return def _rbf_reduce_to_meshVar(self, meshVar, verbose=False): """ This method updates a mesh variable for the current swarm & particle variable state by reducing the swarm to the nearest point for each particle Here is how it works: 1) for each particle, create a distance-weighted average on the node data 2) check to see which nodes have zero weight / zero contribution and replace with nearest particle value Todo: caching the k-d trees etc for the proxy-mesh-variable nodal points Todo: some form of global fall-back for when there are no particles on a processor """ # if not proxied, nothing to do. return. if not self._meshVar: return # 1 - Average particles to nodes with distance weighted average # Use cached KDTree for interpolation (avoids redundant index construction) kd = meshVar._get_kdtree() with self.swarm.access(): d, n = kd.query(self.swarm.data, k=1, sqr_dists=False) # need actual distances node_values = np.zeros((meshVar.coords.shape[0], self.num_components)) w = np.zeros(meshVar.coords.shape[0]) if not self._nn_proxy: for i in range(self.local_size): # if b[i]: node_values[n[i], :] += self.data[i, :] / (1.0e-24 + d[i]) w[n[i]] += 1.0 / (1.0e-24 + d[i]) node_values[np.where(w > 0.0)[0], :] /= w[np.where(w > 0.0)[0]].reshape(-1, 1) # 2 - set NN vals on mesh var where w == 0.0 p_nnmap = self.swarm._get_map(self) meshVar.data[...] = node_values[...] meshVar.data[np.where(w == 0.0), :] = self.data[p_nnmap[np.where(w == 0.0)], :] return # # Need to be able to unpack as well # def pack_raw_data_to_petsc(self, data_array): # """Convert an array in the correct shape for the underlying variable into something that can be loaded into # the flat storage structure used by PETSc in a numpy assigment (with index broadcasting etc) # """ # shape = self.shape # storage_size = self._data_layout(-1) # data_array_3d = data_array.reshape(-1, *self.shape) # with self.swarm.access(self): # for i in range(shape[0]): # for j in range(shape[1]): # ij = self._data_layout(i, j) # self._data[:, ij] = data_array_3d[:, i, j] # return
[docs] def pack_uw_data_to_petsc(self, data_array, sync=True): """ Enhanced pack method that directly accesses PETSc field without access() context. Designed for the new swarmVariable.array interface. Parameters ---------- data_array : numpy.ndarray Array data to pack into PETSc field sync : bool Whether to sync parallel operations (default True) """ shape = self.shape data_array_3d = data_array.reshape(-1, *self.shape) # Direct PETSc field access without context manager petsc_data = self.swarm.dm.getField(self.clean_name).reshape((-1, self.num_components)) try: # Pack data using same layout as original method for i in range(shape[0]): for j in range(shape[1]): ij = self._data_layout(i, j) petsc_data[:, ij] = data_array_3d[:, i, j] # Increment variable state to track changes self._increment() # Update the proxy mesh variable if one exists (for integral calculations) self._update() # Sync parallel operations if requested if sync: # TODO: Add parallel sync logic here if needed pass finally: # Always restore the field self.swarm.dm.restoreField(self.clean_name)
# def unpack_raw_data_to_petsc(self, squeeze=True): # """Return an array in the correct shape for the underlying variable from # the flat storage structure used by PETSc. By default, use numpy squeeze to remove additional # dimensions (keep those dimensions to leave all data as 3D array - scalars being shape (1,1), vectors # being (1,dim) and so on) # """ # shape = self.shape # with self.swarm.access(): # points = self._data.shape[0] # data_array_3d = np.empty(shape=(points, *shape), dtype=self._data.dtype) # for i in range(shape[0]): # for j in range(shape[1]): # ij = self._data_layout(i, j) # data_array_3d[:, i, j] = self._data[:, ij] # if squeeze: # return data_array_3d.squeeze() # else: # return data_array_3d
[docs] def unpack_uw_data_from_petsc(self, squeeze=True, sync=True): """ Enhanced unpack method that directly accesses PETSc field without access() context. Designed for the new swarmVariable.array interface. Parameters ---------- squeeze : bool Whether to squeeze singleton dimensions (default True) sync : bool Whether to sync parallel operations (default True) """ shape = self.shape # Direct PETSc field access without context manager petsc_data = self.swarm.dm.getField(self.clean_name).reshape((-1, self.num_components)) try: # Sync parallel operations if requested if sync: # TODO: Add parallel sync logic here if needed pass # Unpack data using same layout as original method points = petsc_data.shape[0] data_array_3d = np.empty(shape=(points, *shape), dtype=petsc_data.dtype) for i in range(shape[0]): for j in range(shape[1]): ij = self._data_layout(i, j) data_array_3d[:, i, j] = petsc_data[:, ij] finally: # Always restore the field self.swarm.dm.restoreField(self.clean_name) if squeeze: return data_array_3d.squeeze() else: return data_array_3d
[docs] def pack_raw_data_to_petsc(self, data_array, sync=True): """ Pack data array to PETSc using traditional data shape (-1, num_components). Direct PETSc access without access() context for backward compatibility. Parameters ---------- data_array : numpy.ndarray Array data in traditional flat format (-1, num_components) sync : bool Whether to sync parallel operations (default True) """ import numpy as np # Convert to expected shape: (-1, num_components) data_array = np.atleast_2d(data_array) if data_array.shape[1] != self.num_components: raise ValueError( f"Data array must have shape (-1, {self.num_components}), got {data_array.shape}" ) # Direct PETSc field access without context manager petsc_data = self.swarm.dm.getField(self.clean_name).reshape((-1, self.num_components)) try: # Direct assignment in traditional flat format petsc_data[:] = data_array # Increment variable state to track changes self._increment() # Update the proxy mesh variable if one exists (for integral calculations) self._update() # Sync parallel operations if requested if sync: # TODO: Add parallel sync logic here if needed pass finally: # Always restore the field self.swarm.dm.restoreField(self.clean_name) return
[docs] def unpack_raw_data_from_petsc(self, squeeze=True, sync=True): """ Unpack data from PETSc in traditional data shape (-1, num_components). Direct PETSc access without access() context for backward compatibility. Parameters ---------- squeeze : bool Whether to remove singleton dimensions (default True) sync : bool Whether to sync parallel operations (default True) Returns ------- numpy.ndarray Array data in traditional flat format (-1, num_components) """ import numpy as np # Check if swarm has any particles before accessing field swarm_size = self.swarm.local_size if swarm_size <= 0: # Swarm not populated yet, return empty array return np.zeros((0, self.num_components)) # Direct PETSc field access without context manager field_data = self.swarm.dm.getField(self.clean_name) if field_data is None: # Field not properly initialized, restore and return empty array self.swarm.dm.restoreField(self.clean_name) return np.zeros((0, self.num_components)) petsc_data = field_data.reshape((-1, self.num_components)) try: # Return data in traditional flat format result = petsc_data.copy() # Sync parallel operations if requested if sync: # TODO: Add parallel sync logic here if needed pass finally: # Always restore the field self.swarm.dm.restoreField(self.clean_name) if squeeze: return result.squeeze() else: return result
def _object_viewer(self): """This will substitute specific information about this object""" from IPython.display import Latex, Markdown, display from textwrap import dedent # feedback on this instance # display( Markdown( f"""**SwarmVariable:** > symbol: ${self.symbol}$\n > shape: ${self.shape}$\n > proxy: ${self._proxy}$\n > proxy_degree: ${self._proxy_degree}$\n > proxy_continuous: `{self._proxy_continuous}`\n > type: `{self.vtype.name}`""" ), ) display(self.data), return
[docs] def rbf_interpolate(self, new_coords, verbose=False, nnn=None): """ Radial basis function interpolation of particle data to arbitrary points. Uses inverse-distance weighting to interpolate particle values to new coordinate locations. Parameters ---------- new_coords : numpy.ndarray Target coordinates of shape (N, dim) to interpolate to. verbose : bool, default=False Print diagnostic information during interpolation. nnn : int, optional Number of nearest neighbors to use. Defaults to ``mesh.dim + 1``. Returns ------- numpy.ndarray Interpolated values at the target coordinates. """ # An inverse-distance mapping is quite robust here ... as long # as we take care of the case where some nodes coincide (likely if used with mesh2mesh) # We try to eliminate contributions from recently remeshed particles import numpy as np # Get data directly from PETSc to avoid circular callback dependencies raw_data = self.unpack_raw_data_from_petsc(squeeze=False, sync=False) data_size = raw_data.shape # What to do if there are no particles if data_size[0] <= 1: return np.zeros((new_coords.shape[0], data_size[1])) if nnn is None: nnn = self.swarm.mesh.dim + 1 if nnn > data_size[0]: nnn = data_size[0] # Use direct PETSc access to avoid callback circular dependency if self.swarm.recycle_rate > 1: not_remeshed = self.swarm._remeshed.data[:, 0] != 0 D = raw_data[not_remeshed].copy() kdt = uw.kdtree.KDTree(self.swarm._particle_coordinates.data[not_remeshed, :]) values = kdt.rbf_interpolator_local(new_coords, D, nnn, 2, verbose) else: D = raw_data.copy() # Use cached KDTree for standard swarms kdt = self.swarm._get_kdtree() values = kdt.rbf_interpolator_local(new_coords, D, nnn, 2, verbose) return values
@property def swarm(self): """ The swarm this variable belongs to (accessed via weak reference). Raises RuntimeError if the swarm has been garbage collected. """ if self._swarm_ref is None: raise RuntimeError("SwarmVariable has no swarm reference (internal error)") swarm = self._swarm_ref() if swarm is None: raise RuntimeError( f"Swarm for variable '{self.clean_name}' has been garbage collected. " "Variables cannot outlive their parent swarm." ) return swarm @property def old_data(self): """TESTING: Original data property implementation.""" if self._data is None: raise RuntimeError("Data must be accessed via the swarm `access()` context manager.") return self._data @property def data(self): """ Canonical data storage in flat format for internal operations. Returns particle data in shape ``(-1, num_components)`` regardless of variable type. Values are always **non-dimensional** (no unit conversion). This property handles PETSc synchronization. The ``.array`` property is a view of this with shape conversion. When to Use ----------- - **Variable-to-variable transfers**: Copying data between swarm variables avoids redundant unit conversions - **Low-level operations**: Direct access to particle data - **Backward compatibility**: Existing code using flat format For general user access, prefer ``.array`` which provides a structured shape. Returns ------- NDArray_With_Callback Array with shape ``(-1, num_components)`` with automatic PETSc sync. Examples -------- >>> # Efficient variable-to-variable copy >>> new_material.data[...] = old_material.data[...] >>> # Check data shape >>> scalar_var.data.shape # (n_particles, 1) >>> vector_var.data.shape # (n_particles, dim) See Also -------- array : Structured format ``(N, a, b)`` for general access. sym : Symbolic representation for equations. """ # Cache and reuse canonical data object to avoid field access conflicts # Use direct __dict__ check to avoid MathematicalMixin recursion if "_canonical_data" not in self.__dict__ or self._canonical_data is None: # Create the single canonical data array with PETSc sync self._canonical_data = self._create_canonical_data_array() return self._canonical_data @property def array(self): """ Primary interface for reading and writing particle data. Returns a structured array view with shape ``(N, a, b)`` where ``N`` is the number of particles and ``(a, b)`` depends on the variable type: - Scalar: ``(N, 1, 1)`` - Vector: ``(N, 1, dim)`` - Tensor: ``(N, dim, dim)`` Returns ------- NDArray Array view that delegates changes back to canonical storage. Examples -------- >>> # Scalar field initialization >>> material_property.array[:, 0, 0] = 1e21 # Set viscosity >>> # Vector field >>> velocity.array[:, 0, 0] = vx_values # x-component >>> velocity.array[:, 0, 1] = vy_values # y-component >>> # Reading values >>> max_visc = material_property.array[:, 0, 0].max() Notes ----- This property is a view of the canonical ``.data`` property with automatic shape conversion. All modifications trigger proxy variable updates for mesh interpolation. See Also -------- data : Flat format ``(-1, components)`` for variable-to-variable transfers. sym : Symbolic representation for use in equations. """ return self._create_array_view() @array.setter def array(self, array_value): """ Set variable data through canonical data property with format conversion. """ if self._is_simple_variable(): # Simple case: reshape array format (N,a,b) to canonical format (N,components) canonical_data = array_value.reshape(array_value.shape[0], -1) else: # Complex case: use pack operations for tensor layout conversion canonical_data = self._pack_array_to_data_format(array_value) # Assign to canonical data property (triggers PETSc sync) self.data[:] = canonical_data @property def sym(self): r"""Symbolic representation for use in equations. Returns the symbolic expression from the proxy mesh variable, which can be used in SymPy expressions for constitutive models, boundary conditions, etc. Returns ------- sympy.Matrix Symbolic matrix expression. Notes ----- The proxy is automatically updated if particle data has changed. """ # Ensure proxy is up to date before returning symbolic representation self._update_proxy_if_stale() return self._meshVar.sym @property def sym_1d(self): r"""Flattened symbolic representation. Returns the symbolic expression as a 1D (column) vector form, useful for Voigt notation in tensor calculations. Returns ------- sympy.Matrix Flattened symbolic expression. """ # Ensure proxy is up to date before returning symbolic representation self._update_proxy_if_stale() return self._meshVar.sym_1d # Global statistics methods (MPI-aware) for particle data # Note: Only methods that make sense for non-uniformly distributed particles # are provided. Mean/RMS/variance are NOT provided because particles cluster # unevenly in the domain, making these statistics misleading. @uw.collective_operation def global_max(self, axis=None, out=None, keepdims=False): """ Maximum value across all MPI ranks. Finds the maximum value of the particle property across all processors. Useful for finding extreme values in particle swarm data. Parameters ---------- axis : None, int, or tuple of ints, optional Axis or axes along which to operate. By default, flattened input is used. out : None, optional Alternative output array (not supported, kept for API compatibility). keepdims : bool, optional If True, reduced axes are left as dimensions with size one. Returns ------- UWQuantity or scalar Maximum value with units preserved (if variable has units). Examples -------- >>> max_temp = temperature_swarm.global_max() >>> print(f"Maximum temperature: {max_temp}") Notes ----- This is a collective operation - all ranks must call it. The result is identical on all ranks. """ from underworld3.utilities.unit_aware_array import UnitAwareArray # Wrap data in UnitAwareArray to use its global_max implementation temp_array = UnitAwareArray(self.data, units=self._units) return temp_array.global_max(axis=axis, out=out, keepdims=keepdims) @uw.collective_operation def global_min(self, axis=None, out=None, keepdims=False): """ Minimum value across all MPI ranks. Finds the minimum value of the particle property across all processors. Useful for finding extreme values in particle swarm data. Parameters ---------- axis : None, int, or tuple of ints, optional Axis or axes along which to operate. By default, flattened input is used. out : None, optional Alternative output array (not supported, kept for API compatibility). keepdims : bool, optional If True, reduced axes are left as dimensions with size one. Returns ------- UWQuantity or scalar Minimum value with units preserved (if variable has units). Examples -------- >>> min_pressure = pressure_swarm.global_min() >>> print(f"Minimum pressure: {min_pressure}") Notes ----- This is a collective operation - all ranks must call it. The result is identical on all ranks. """ from underworld3.utilities.unit_aware_array import UnitAwareArray temp_array = UnitAwareArray(self.data, units=self._units) return temp_array.global_min(axis=axis, out=out, keepdims=keepdims) @uw.collective_operation def global_sum(self, axis=None, out=None, keepdims=False): """ Sum of values across all MPI ranks. Computes the sum of particle property values across all processors. Parameters ---------- axis : None, int, or tuple of ints, optional Axis or axes along which to operate. By default, flattened input is used. out : None, optional Alternative output array (not supported, kept for API compatibility). keepdims : bool, optional If True, reduced axes are left as dimensions with size one. Returns ------- UWQuantity or scalar Sum with units preserved (if variable has units). Notes ----- This is a collective operation - all ranks must call it. The result is identical on all ranks. Warning: This sum is NOT a physical domain-integrated quantity because particles are non-uniformly distributed. For domain integration, use the proxy mesh variable with uw.maths.Integral(). """ from underworld3.utilities.unit_aware_array import UnitAwareArray temp_array = UnitAwareArray(self.data, units=self._units) return temp_array.global_sum(axis=axis, out=out, keepdims=keepdims) @uw.collective_operation def global_norm(self, ord=None): """ L2 norm (Frobenius norm) across all MPI ranks. Computes the L2 norm of particle property values: sqrt(sum(x**2)) across all processors. Parameters ---------- ord : {non-zero int, inf, -inf, 'fro', 'nuc'}, optional Order of the norm (default: None = 2-norm) Returns ------- UWQuantity or scalar L2 norm with units preserved (if variable has units). Notes ----- This is a collective operation - all ranks must call it. The result is identical on all ranks. For vectors, computes the Frobenius norm treating the array as flattened. Warning: This norm is NOT a physical domain-integrated quantity because particles are non-uniformly distributed. """ from underworld3.utilities.unit_aware_array import UnitAwareArray temp_array = UnitAwareArray(self.data, units=self._units) return temp_array.global_norm(ord=ord) @uw.collective_operation def global_size(self): """ Total particle count across all MPI ranks. Returns the total number of particles across all processors. Useful for population monitoring and load balancing diagnostics. Returns ------- int Total number of particles across all ranks. Examples -------- >>> total_particles = swarm_var.global_size() >>> local_particles = swarm_var.data.shape[0] >>> print(f"Rank has {local_particles} of {total_particles} particles") Notes ----- This is a collective operation - all ranks must call it. The result is identical on all ranks. """ from underworld3.utilities.unit_aware_array import UnitAwareArray temp_array = UnitAwareArray(self.data, units=self._units) return temp_array.global_size()
[docs] @timing.routine_timer_decorator def save( self, filename: int, compression: Optional[bool] = False, compressionType: Optional[str] = "gzip", force_sequential=False, ): """ Save the swarm variable to a h5 file. Parameters ---------- filename : The filename of the swarm variable to save to disk. compression : Add compression to the h5 files (saves space but increases write times with increasing no. of processors) compressionType : Type of compression to use, 'gzip' and 'lzf' supported. 'gzip' is default. Compression also needs to be set to 'True'. force_sequential : activate the serial version of hdf5 """ if h5py.h5.get_config().mpi == False and comm.size > 1 and comm.rank == 0: warnings.warn( "Collective IO not possible as h5py not available in parallel mode. Switching to sequential. This will be slow for models running on multiple processors", stacklevel=2, ) if compression == True and comm.rank == 0: warnings.warn("Compression may slow down write times", stacklevel=2) if filename.endswith(".h5") == False: raise RuntimeError("The filename must end with .h5") local_data = self.data[:] local_n = local_data.shape[0] n_components = self.num_components if h5py.h5.get_config().mpi == True and not force_sequential: # BUGFIX(#151): the previous parallel path called # h5f.create_dataset("data", data=self.data[:]) # collectively, but each rank passed its own local-sized array. # In parallel HDF5 every rank must specify the *same* dataset # shape on a collective create_dataset; passing different shapes # leaves HDF5's internal metadata inconsistent so the collective # close never synchronises, producing a silent hang. # # Fix: allgather the per-rank sizes, create the dataset at the # global shape, then each rank writes its own slice. sizes = comm.allgather(local_n) total_n = sum(sizes) offset = sum(sizes[: comm.rank]) with h5py.File(f"{filename[:-3]}.h5", "w", driver="mpio", comm=comm) as h5f: if compression == True: dset = h5f.create_dataset( "data", shape=(total_n, n_components), dtype=local_data.dtype, chunks=True, compression=compressionType, ) else: dset = h5f.create_dataset( "data", shape=(total_n, n_components), dtype=local_data.dtype, ) if local_n > 0: dset[offset : offset + local_n] = local_data else: # Sequential fallback: rank 0 creates the file and writes its slab, # then each higher rank appends in turn. Indentation here matters — # the barrier/loop must be outside the rank-0 branch so all ranks # synchronise (the previous version nested them inside, leaving # higher ranks' data unwritten and rank 0 deadlocked at the # barrier with no peers). if comm.rank == 0: with h5py.File(f"{filename[:-3]}.h5", "w") as h5f: if compression == True: h5f.create_dataset( "data", data=local_data, chunks=True, maxshape=(None, n_components), compression=compressionType, ) else: h5f.create_dataset( "data", data=local_data, chunks=True, maxshape=(None, n_components), ) comm.barrier() for proc in range(1, comm.size): if comm.rank == proc and local_n > 0: with h5py.File(f"{filename[:-3]}.h5", "a") as h5f: incoming_size = h5f["data"].shape[0] h5f["data"].resize((incoming_size + local_n), axis=0) h5f["data"][incoming_size:] = local_data comm.barrier() ## Add swarm variable unit metadata to the file import json # Use preferred selective_ranks pattern for unit metadata with uw.selective_ranks(0) as should_execute: if should_execute: with h5py.File(f"{filename[:-3]}.h5", "a") as h5f: # Add swarm variable unit metadata swarm_metadata = { "coordinate_units": ( str(self.swarm.coordinate_units) if hasattr(self.swarm, "coordinate_units") else None ), "variable_units": ( str(self.units) if hasattr(self, "units") and self.units else None ), "variable_dimensionality": ( str(self.dimensionality) if hasattr(self, "dimensionality") else None ), "units_backend": "pint" if self.has_units else None, "proxy_degree": self._proxy_degree, "num_components": self.num_components, "variable_name": self.name, } # Store in dataset attributes if "data" in h5f: h5f["data"].attrs["units_metadata"] = json.dumps(swarm_metadata) return
[docs] @timing.routine_timer_decorator def write_proxy(self, filename: str): # if not proxied, nothing to do. return. if not self._meshVar: uw.pprint("No proxy mesh variable that can be saved") return self._meshVar.write(filename) return
[docs] @timing.routine_timer_decorator def read_timestep( self, data_filename: str, swarmID: str, data_name: str, index: int, outputPath="", ): # mesh.write_timestep( "test", meshUpdates=False, meshVars=[X], outputPath="", index=0) # swarm.write_timestep("test", "swarm", swarmVars=[var], outputPath="", index=0) output_base_name = os.path.join(outputPath, data_filename) swarmFilename = output_base_name + f".{swarmID}.{index:05}.h5" filename = output_base_name + f".{swarmID}.{data_name}.{index:05}.h5" # check if swarmFilename exists if os.path.isfile(os.path.abspath(swarmFilename)): # easier to debug abs path print(f"Reading swarm information from {swarmFilename}", flush=True) pass else: raise RuntimeError(f"{os.path.abspath(swarmFilename)} does not exist") if os.path.isfile(os.path.abspath(filename)): print(f"Reading variable information from {filename}", flush=True) pass else: raise RuntimeError(f"{os.path.abspath(filename)} does not exist") # Memory-bounded parallel read. Rank 0 reads the saved file in one # shot (this is not chunked — the file is the smallest copy of the # data and we hold it for the duration of one ``add_particles`` call; # streaming hyperslabs is a follow-up if rank-0 RAM becomes the # binding constraint). Saved (coord, value) pairs are then pushed # into a transient routing swarm and migrated *with the same rule # the live swarm uses* — ``points_in_domain`` plus the centroid # fallback inside ``Swarm.migrate``. That co-locates a saved point # and the corresponding live particle on the same rank, so the # rank-local KDTree always sees the right neighbour. n_components = self.num_components dim = self.swarm.mesh.dim if uw.mpi.rank == 0: with ( h5py.File(f"{filename}", "r") as h5f_data, h5py.File(f"{swarmFilename}", "r") as h5f_swarm, ): file_dtype = h5f_data["data"].dtype if self.dtype != file_dtype: warnings.warn( f"{os.path.basename(filename)} dtype ({file_dtype}) " f"does not match {self.name} swarm variable dtype " f"({self.dtype}) which may result in a loss of data.", stacklevel=2, ) X_chunk = h5f_swarm["coordinates"][()].reshape(-1, dim) D_chunk = h5f_data["data"][()].reshape(-1, n_components) else: X_chunk = np.empty((0, dim), dtype=np.double) D_chunk = np.empty((0, n_components), dtype=np.double) tmp_swarm = uw.swarm.Swarm(self.swarm.mesh) saved = SwarmVariable( "_read_timestep_saved", tmp_swarm, vtype=uw.VarType.MATRIX, size=(1, n_components), dtype=float, _proxy=False, varsymbol=r"\cal{S}", ) size_before = max(tmp_swarm.dm.getLocalSize(), 0) tmp_swarm.add_particles_with_global_coordinates(X_chunk, migrate=False) tmp_swarm._invalidate_canonical_data() saved.array[size_before:, 0, :] = D_chunk[:, :] # Use the same migration rule as the live swarm so saved points and # live particles at the same coordinate land on the same rank. # ``delete_lost_points=False`` keeps points that ``points_in_domain`` # rejects on every rank; they fall through to the centroid fallback # inside ``Swarm.migrate`` and end up somewhere deterministic. tmp_swarm.migrate(remove_sent_points=True, delete_lost_points=False) landed_X = tmp_swarm._particle_coordinates.array[...].reshape(-1, dim) landed_D = saved.array[:, 0, :] if landed_X.shape[0] == 0: warnings.warn( f"[rank {uw.mpi.rank}] read_timestep: no saved swarm points " f"landed locally; '{self.name}' on this rank will not be updated", stacklevel=2, ) else: kdt = uw.kdtree.KDTree(landed_X) self.array[:, 0, :] = kdt.rbf_interpolator_local( self.swarm._particle_coordinates.data, landed_D, nnn=1 ) return
[docs] class IndexSwarmVariable(SwarmVariable): """ Integer-valued swarm variable for material tracking. IndexSwarmVariable stores integer indices at particle locations, typically used for tracking distinct material types. It automatically generates symbolic mask expressions for each material index, enabling material- dependent properties in constitutive models. Parameters ---------- name : str Variable name for identification and I/O. swarm : Swarm Parent swarm object. indices : int Number of distinct material indices (default 1). proxy_degree : int Polynomial degree for mesh projection (default 1). proxy_continuous : bool Whether mesh proxy is continuous (default True). Examples -------- >>> material = IndexSwarmVariable("M", swarm, indices=3) >>> material.data[:] = 0 # Set all particles to material 0 >>> # Use sym[i] as multiplier for material i properties >>> viscosity = material.sym[0] * 1e20 + material.sym[1] * 1e21 See Also -------- SwarmVariable : Base class for particle-supported variables. """
[docs] @timing.routine_timer_decorator def __init__( self, name, swarm, indices=1, proxy_degree=1, proxy_continuous=True, update_type=0, npoints=5, radius=0.5, npoints_bc=2, ind_bc=None, varsymbol=None, ): self.indices = indices self.nnn = npoints self.radius_s = radius # **2 # changed to radius self.update_type = update_type if self.update_type == 1: self.nnn_bc = npoints_bc self.ind_bc = ind_bc # These are the things we require of the generic swarm variable type super().__init__( name, swarm, size=1, vtype=None, dtype=int, proxy_degree=proxy_degree, proxy_continuous=proxy_continuous, _proxy=False, varsymbol=varsymbol, ) """ vtype = (None,) dtype = (float,) proxy_degree = (1,) proxy_continuous = (True,) _register = (True,) _proxy = (True,) _nn_proxy = (False,) varsymbol = (None,) rebuild_on_cycle = (True,) """ # The indices variable defines how many "level set" maps we create as components in the proxy variable import sympy self._MaskArray = sympy.Matrix.zeros(1, self.indices) self._meshLevelSetVars = [None] * self.indices for i in range(indices): self._meshLevelSetVars[i] = uw.discretisation.MeshVariable( name + R"^{[" + str(i) + R"]}", self.swarm.mesh, num_components=1, degree=proxy_degree, continuous=proxy_continuous, ) self._MaskArray[0, i] = self._meshLevelSetVars[i].sym[0, 0] # Initialize lazy evaluation state self._proxy_stale = True # Proxy variables need initial update return
def _update(self): """ Backward compatibility wrapper for _update_proxy_variables. Maintains existing API while implementing lazy evaluation internally. """ self._update_proxy_variables() def _on_data_changed(self): """ Hook called by unified data callback when canonical data changes. For IndexSwarmVariable, this marks proxy variables as stale for lazy evaluation. This replaces the complex custom array override with a simple hook. """ self._proxy_stale = True # This is the sympy vector interface - it's meaningless if these are not spatial arrays @property def sym(self): """ Lazy evaluation of symbolic mask array. Only updates proxy variables when they're actually needed (when sym is accessed) and only if the proxy variables are marked as stale due to data changes. This avoids expensive RBF interpolation during data assignment operations. """ if self._proxy_stale: self._update_proxy_variables() self._proxy_stale = False return self._MaskArray @property def sym_1d(self): """ One-dimensional symbolic mask array (alias for :attr:`sym`). Returns the same symbolic mask array as :attr:`sym`, provided for API compatibility with other variable types that distinguish between multi-dimensional and flattened representations. Returns ------- sympy.Matrix Symbolic mask array of shape (indices, 1). See Also -------- sym : Primary symbolic mask array access. """ return self._MaskArray # We can also add a __getitem__ call to access each mask def __getitem__(self, index): return self.sym[index]
[docs] def createMask(self, funcsList): """ Create a material-weighted symbolic expression from per-material values. This method creates a SymPy expression that combines multiple material properties using the index variable's symbolic masks. The result can be used directly in Underworld's solver equations. Parameters ---------- funcsList : list or tuple List of values or symbolic expressions, one per material index. Length must equal :attr:`indices`. Returns ------- sympy.Basic Symbolic expression: ``sum(funcsList[i] * mask[i] for i in indices)``. Raises ------ RuntimeError If ``funcsList`` is not a list/tuple or has wrong length. Examples -------- >>> # Define viscosity per material >>> viscosity = material.createMask([1e21, 1e20, 1e22]) # 3 materials >>> # Use in solver >>> stokes.constitutive_model.viscosity = viscosity See Also -------- visMask : Create visualization mask showing material indices. """ if not isinstance(funcsList, (tuple, list)): raise RuntimeError("Error input for createMask() - wrong type of input") if len(funcsList) != self.indices: raise RuntimeError("Error input for createMask() - wrong length of input") symo = sympy.S.Zero for i in range(self.indices): symo += funcsList[i] * self._MaskArray[i] return symo
[docs] def viewMask(self, expr): """ Decompose a masked expression into per-material components. .. note:: This method is not yet implemented. Currently returns None. Takes a symbolic expression created by :meth:`createMask` and extracts the individual material-specific components. Parameters ---------- expr : sympy.Basic A masked symbolic expression created by :meth:`createMask`. Returns ------- list or None List of symbolic expressions, one per material index. Currently returns None (not implemented). See Also -------- createMask : Create a masked expression from per-material values. """ # TODO: Implement decomposition of masked expressions # output = [] # for i in range(self.indices): # tmp = {} # for j in range(self.indices): # if i == j: pass # tmp # return output pass
[docs] def visMask(self): """ Create a visualization mask showing material indices. Returns a symbolic expression where each material region shows its index value (0, 1, 2, ...). Useful for visualization and debugging of material distributions. Returns ------- sympy.Basic Symbolic expression evaluating to material index at each point. Examples -------- >>> vis_field = material.visMask() >>> values = uw.function.evaluate(vis_field, swarm.data) >>> # values[i] gives material index at particle i See Also -------- createMask : Create arbitrary material-weighted expressions. """ return self.createMask(list(range(self.indices)))
[docs] def view(self): """ Show information on IndexSwarmVariable """ uw.pprint(f"IndexSwarmVariable {self}") uw.pprint(f"Numer of indices {self.indices}")
def _update_proxy_variables(self): """ This method updates the proxy mesh (vector) variable for the index variable on the current swarm locations Here is how it works: 1) for each particle, create a distance-weighted average on the node data 2) for each index in the set, we create a mask mesh variable by mapping 1.0 wherever the index matches and 0.0 where it does not. NOTE: If no material is identified with a given nodal value, the default is to impose a near-neighbour hunt for a valid material and set that one ## ToDo: This should be revisited to match the updated master copy of _update update_type 0: assign the particles to the nearest mesh_levelset nodes, and calculate the value on nodes from them. update_type 1: calculate the material property value on mesh_levelset nodes from the nearest N particles directly. """ if self.update_type == 0: # Use non-dimensional coordinates for internal level set KDTree kd = self._meshLevelSetVars[0]._get_kdtree() n_distance, n_indices = kd.query( self.swarm._particle_coordinates.data, k=self.nnn, sqr_dists=False ) kd_swarm = self.swarm._get_kdtree() # n, d, b = kd_swarm.find_closest_point(self._meshLevelSetVars[0].coords) d, n = kd_swarm.query(self._meshLevelSetVars[0].coords, k=1, sqr_dists=False) for ii in range(self.indices): meshVar = self._meshLevelSetVars[ii] with self.swarm.mesh.access(meshVar), self.swarm.access(): node_values = np.zeros((meshVar.data.shape[0],)) w = np.zeros((meshVar.data.shape[0],)) for i in range(self.swarm.local_size): tem = np.isclose(n_distance[i, :], n_distance[i, 0]) dist = n_distance[i, tem] indices = n_indices[i, tem] tem = dist < self.radius_s dist = dist[tem] indices = indices[tem] for j, ind in enumerate(indices): node_values[ind] += ( np.isclose(self.data[i], ii) / (1.0e-16 + dist[j]) )[0] w[ind] += 1.0 / (1.0e-16 + dist[j]) node_values[np.where(w > 0.0)[0]] /= w[np.where(w > 0.0)[0]] meshVar.data[:, 0] = node_values[...] # if there is no material found, # impose a near-neighbour hunt for a valid material and set that one ind_w0 = np.where(w == 0.0)[0] if len(ind_w0) > 0: ind_ = np.where(self.data[n[ind_w0]] == ii)[0] if len(ind_) > 0: meshVar.data[ind_w0[ind_]] = 1.0 elif self.update_type == 1: kd = uw.kdtree.KDTree(self.swarm._particle_coordinates.data) n_distance, n_indices = kd.query( self._meshLevelSetVars[0].coords, k=self.nnn, sqr_dists=False ) for ii in range(self.indices): meshVar = self._meshLevelSetVars[ii] node_values = np.zeros((meshVar.data.shape[0],)) w = np.zeros((meshVar.data.shape[0],)) for i in range(meshVar.data.shape[0]): if i not in self.ind_bc: ind = np.where(n_distance[i, :] < self.radius_s) a = 1.0 / (n_distance[i, ind] + 1.0e-16) w[i] = np.sum(a) b = np.isclose(self.data[n_indices[i, ind]], ii) node_values[i] = np.sum(np.dot(a, b)) if ind[0].size == 0: w[i] = 0 else: ind = np.where(n_distance[i, : self.nnn_bc] < self.radius_s) a = 1.0 / (n_distance[i, : self.nnn_bc][ind] + 1.0e-16) w[i] = np.sum(a) b = np.isclose(self.data[n_indices[i, : self.nnn_bc][ind]], ii) node_values[i] = np.sum(np.dot(a, b)) if ind[0].size == 0: w[i] = 0 node_values[np.where(w > 0.0)[0]] /= w[np.where(w > 0.0)[0]] meshVar.data[:, 0] = node_values[...] # if there is no material found, # impose a near-neighbour hunt for a valid material and set that one ind_w0 = np.where(w == 0.0)[0] if len(ind_w0) > 0: ind_ = np.where(self.data[n_indices[ind_w0]] == ii)[0] if len(ind_) > 0: meshVar.data[ind_w0[ind_]] = 1.0 return
## Import PIC-related classes from separate module to maintain compatibility # from .pic_swarm import PICSwarm, NodalPointPICSwarm, SwarmPICLayout ## This should be the basic swarm, and we can then create a sub-class that will ## be a PIC swarm # PICSwarm and NodalPointPICSwarm classes have been moved to pic_swarm.py ## New - Basic Swarm (no PIC skillz) ## What is missing: ## - no celldm ## - PIC layouts of particles are not directly available / must be done by hand ## - No automatic migration - must compute ranks for the particle swarms ## - No automatic definition of coordinate fields (need to add by hand)
[docs] class Swarm(Stateful, uw_object): """ A basic particle swarm implementation for Lagrangian particle tracking and data storage. The UW `Swarm` class provides a simplified particle management system that uses PETSc's DMSWARM_BASIC type. Unlike the standard `Swarm` class, this implementation does not rely on PETSc to determine ranks for particle migration but instead uses our own kdtree neighbour-domain computations. This class is preferred for most operations except where particle / cell relationships are always required. Parameters ---------- mesh : uw.discretisation.Mesh The mesh object that defines the computational domain for particle operations. Particles will be associated with this mesh for spatial queries and operations. recycle_rate : int, optional Rate at which particles are recycled for streak management. If > 1, enables streak particle functionality where particles are duplicated and tracked across multiple cycles. Default is 0 (no recycling). verbose : bool, optional Enable verbose output for debugging and monitoring particle operations. Default is False. Examples -------- Create a basic swarm and populate with particles: >>> import underworld3 as uw >>> mesh = uw.meshing.UnstructuredSimplexBox(minCoords=(0,0), maxCoords=(1,1)) >>> swarm = uw.swarm.Swarm(mesh=mesh) >>> swarm.populate(fill_param=2) Create a streak swarm with recycling: >>> streak_swarm = uw.swarm.Swarm(mesh=mesh, recycle_rate=5) >>> streak_swarm.populate(fill_param=1) Add custom particle data: >>> temperature = swarm.add_variable("temperature", 1) >>> velocity = swarm.add_variable("velocity", mesh.dim) Manual particle migration after coordinate updates: Note: particle migration is still called automatically when we `access` and update the particle_coordinates variables Note: `swarm.populate` uses a the mesh point locations for discontinuous interpolants to determine the particle locations. """ instances = 0
[docs] @timing.routine_timer_decorator def __init__(self, mesh, recycle_rate=0, verbose=False, clip_to_mesh=True): Swarm.instances += 1 self.verbose = verbose self._clip_to_mesh = clip_to_mesh # Store reference to model instead of direct mesh reference # This enables dynamic mesh handover while maintaining access to mesh services import underworld3 as uw model = uw.get_default_model() # Register mesh with model if not already present model._register_mesh(mesh) # Store reference to this swarm's specific mesh for proxy operations self._mesh_id = id(mesh) self._model_ref = weakref.ref(model) self.dim = mesh.dim self.cdim = mesh.cdim # Mesh version tracking for coordinate change detection self._mesh_version = mesh._mesh_version # Informational counter incremented at every particle-population # mutation site (populate, migrate, add_particles_*, advection # remesh). NOT a snapshot-restore invalidation gate — restore # rebuilds the local population from the snapshot regardless of # what happened in between. Useful for logging, debugging, and # any cache that wants to know "did the population change?" # See docs/developer/design/in_memory_checkpoint_design.md. self._population_generation = 0 # Register this swarm with the mesh for coordinate change notifications mesh.register_swarm(self) self.dm = PETSc.DMSwarm().create() self.dm.setDimension(self.dim) self.dm.setType(SwarmType.DMSWARM_BASIC.value) self._data = None # Add data structure to hold point location information in # an array with a callback that resets the relevant parts of the # swarm variable stack when the data structure is modified. self._coords = None #### # Is the swarm a streak-swarm ? self.recycle_rate = recycle_rate self.cycle = 0 # dictionary for variables # Using WeakValueDictionary to prevent circular references self._vars = weakref.WeakValueDictionary() # add variable to handle particle coords - match name from PIC_Swarm for consistency self._coord_var = SwarmVariable( "DMSwarmPIC_coor", self, self.cdim, dtype=float, _register=True, _proxy=False, rebuild_on_cycle=False, ) # add variable to handle particle ranks - this exists on the PETSc machinery already self._rank_var = SwarmVariable( "DMSwarm_rank", self, 1, dtype=int, _register=False, _proxy=False, rebuild_on_cycle=False, ) # add variable to hold swarm coordinates during position updates self._X0 = uw.swarm.SwarmVariable( "DMSwarm_X0", self, self.cdim, dtype=float, _register=True, _proxy=False, rebuild_on_cycle=False, ) # This is for swarm streak management: # add variable to hold swarm origins if self.recycle_rate > 1: self._remeshed = uw.swarm.SwarmVariable( "DMSwarm_remeshed", self, 1, dtype=int, _register=True, _proxy=False, rebuild_on_cycle=False, ) self._X0_uninitialised = True self._index = None self._nnmapdict = {} self._migration_disabled = False super().__init__() # Register with the same model already captured in self._model_ref # above (not a fresh ``get_default_model()`` call) so that # ``__del__`` deregisters from the same registry it registered with, # even if the default model is swapped mid-session. model._register_swarm(self)
def __del__(self): """Cleanup swarm: unregister from mesh and model, destroy the DM. Three steps: drop the mesh-side weak-set entry, drop the model-side registry entry (which also forgets any SwarmVariables that belonged to this swarm), and call ``self.dm.destroy()`` to release the PETSc DMSwarm and its registered fields. Historically the model registry kept a strong reference to every swarm and ``__del__`` did not call ``dm.destroy()`` — both leaks accumulated quickly inside time-stepping loops that build transient swarms (global expression evaluation, checkpoint reads, mesh adaptation transfers). The registry is now a ``WeakValueDictionary`` and ``__del__`` cleans up its own resources. """ try: if hasattr(self, "mesh") and self.mesh is not None: self.mesh.unregister_swarm(self) except (AttributeError, ReferenceError, RuntimeError): # Mesh/Model may have already been garbage collected, which is fine pass try: model = self._model_ref() if hasattr(self, "_model_ref") else None if model is not None: model._unregister_swarm(self) except (AttributeError, ReferenceError, RuntimeError): pass try: if hasattr(self, "dm") and self.dm is not None: self.dm.destroy() except Exception: # DM may already have been destroyed (e.g. by Mesh.adapt) or # PETSc may be finalising. Either way, nothing more we can do. pass def _invalidate_canonical_data(self): """Drop cached array views on the coordinates and every registered variable. Required after any operation that bypasses :meth:`Swarm.migrate` but still mutates particle layout — notably the bare ``self.dm.migrate(...)`` used by round-trip evaluation patterns. Calling :meth:`Swarm.migrate` already does this internally; call this directly only when you used the underlying DMSwarm migrate. """ if hasattr(self, "_particle_coordinates") and self._particle_coordinates is not None: self._particle_coordinates._canonical_data = None # ``self._vars`` may be a WeakValueDictionary whose values disappear # asynchronously during GC; iterate a snapshot. for var in list(self._vars.values()): if hasattr(var, "_canonical_data"): var._canonical_data = None # Invalidate cached spatial index self._kdtree = None def _get_kdtree(self): """ Return a cached KDTree for the swarm particle coordinates. Invalidated automatically whenever particles migrate or positions change. """ # Note: self.data returns unit-aware array if units are active, # but kdtree construction expects non-dimensional values. # Use _particle_coordinates.data directly. if not hasattr(self, "_kdtree") or self._kdtree is None: self._kdtree = uw.kdtree.KDTree(self._particle_coordinates.data) return self._kdtree def _route_by_nearest_centroid(self): """Migrate every particle to the rank whose domain-centroid is closest. This is a deterministic alternative to :meth:`migrate`: the destination is a pure function of the coordinate, computed identically on every rank. Two swarms migrated this way are guaranteed to place equal coordinates on the same rank — which the standard ``migrate`` does not guarantee at partition boundaries (vertex DOFs sitting on a shared face can return ``True`` from ``points_in_domain`` on multiple ranks). Used by checkpoint readers and any consumer that needs source data and query coordinates to converge on the same rank without relying on PETSc's DOF distribution. """ centroids = self.mesh._get_domain_centroids() centroid_kdt = uw.kdtree.KDTree(centroids) coords = self.dm.getField("DMSwarmPIC_coor").reshape(-1, self.dim).copy() self.dm.restoreField("DMSwarmPIC_coor") if coords.shape[0] > 0: _, owner_rank = centroid_kdt.query(coords, k=1, sqr_dists=False) rank_arr = self.dm.getField("DMSwarm_rank") # ``DMSwarm_rank`` shape varies by PETSc version: 1-D ``(N,)`` on # 3.21, 2-D ``(N, 1)`` on 3.25+. ``reshape(-1)`` flattens either # form into a writable view into the same buffer. rank_arr.reshape(-1)[:] = owner_rank.astype(rank_arr.dtype, copy=False) self.dm.restoreField("DMSwarm_rank") self.dm.migrate(remove_sent_points=True) uw.mpi.barrier() self._invalidate_canonical_data() @property def mesh(self): """The mesh this swarm operates on""" model = self._model_ref() if model is None: raise RuntimeError("Model has been garbage collected") return model.get_mesh(self._mesh_id) @mesh.setter def mesh(self, new_mesh): """ Assign swarm to a new mesh with dimensional validation and proxy updates. Parameters ---------- new_mesh : uw.discretisation.Mesh New mesh to assign this swarm to Raises ------ ValueError If new mesh has incompatible dimensions """ model = self._model_ref() if model is None: raise RuntimeError("Model has been garbage collected") # Register new mesh with model model._register_mesh(new_mesh) if id(new_mesh) == self._mesh_id: # Check if swarm is already compatible with target mesh if self.dim == new_mesh.dim and self.cdim == new_mesh.cdim: # Dimensions match, check if proxy variables need updating proxy_vars_updated = True for var in self._vars.values(): if ( hasattr(var, "_proxy") and var._proxy and hasattr(var, "_meshVar") and var._meshVar ): if var._meshVar.mesh is not new_mesh: proxy_vars_updated = False break if proxy_vars_updated: return # No change needed # Use swarm's current dimensions for validation (not model.mesh which may have been auto-updated) current_dim = self.dim current_cdim = self.cdim # Critical dimensional validation if new_mesh.dim != current_dim: raise ValueError( f"Cannot assign swarm to mesh with different coordinate dimension. " f"Current swarm dim={current_dim}, new mesh dim={new_mesh.dim}. " f"Swarm particles and variables are sized for {current_dim}D space." ) if new_mesh.cdim != current_cdim: raise ValueError( f"Cannot assign swarm to mesh with different embedding dimension. " f"Current swarm cdim={current_cdim}, new mesh cdim={new_mesh.cdim}." ) # Update model's mesh and handle all swarm transitions model._update_mesh_for_swarm(self, new_mesh) # Update swarm's mesh reference self._mesh_id = id(new_mesh) # Update swarm's cached dimensions self.dim = new_mesh.dim self.cdim = new_mesh.cdim # Recreate all proxy variables for new mesh for var in self._vars.values(): var._create_proxy_variable() # Safe for all variables (proxied or not) # Update mesh version tracking self._mesh_version = new_mesh._mesh_version @property def local_size(self): r"""Number of particles on this MPI rank. Returns ------- int Local particle count. See Also -------- dm.getLocalSize : Underlying PETSc method. """ return self.dm.getLocalSize() # We could probably use a global_size property too @property def data(self): r"""Particle coordinates (alias for :attr:`points`). .. deprecated:: 0.99.0 Use direct DM field access for particle coordinates. Returns ------- numpy.ndarray Particle coordinate array of shape ``(n_particles, dim)``. """ return self.points @property def points(self): """ Swarm particle coordinates in physical units. .. deprecated:: 0.99.0 Use swarm variables or direct DM access instead. ``swarm.points`` is being deprecated. When the mesh has coordinate scaling applied (via model units), this property automatically converts from internal model coordinates to physical coordinates for user access. When the mesh has coordinate units specified, returns a unit-aware array. Returns: numpy.ndarray or UnitAwareArray: Particle coordinates (with units if mesh.units is set) """ import warnings warnings.warn("swarm.points is deprecated", DeprecationWarning, stacklevel=2) # Check for mesh coordinate changes and trigger migration if needed if hasattr(self, "_mesh_version") and self._mesh_version != self.mesh._mesh_version: # Mesh coordinates have changed, force migration to update swarm self._force_migration_after_mesh_change() # Update our mesh version to match self._mesh_version = self.mesh._mesh_version # Get current coordinate data from PETSc (these are in model coordinates) model_coords = (self.dm.getField("DMSwarmPIC_coor").reshape((-1, self.dim))).copy() self.dm.restoreField("DMSwarmPIC_coor") # Apply scaling to convert model coordinates to physical coordinates if hasattr(self.mesh.CoordinateSystem, "_scaled") and self.mesh.CoordinateSystem._scaled: scale_factor = self.mesh.CoordinateSystem._length_scale coords = model_coords * scale_factor else: coords = model_coords # Cache and reuse NDArray_With_Callback object for consistent object identity if not hasattr(self, "_coords") or self._coords is None: # First access: create new NDArray_With_Callback object self._coords = uw.utilities.NDArray_With_Callback( coords, owner=self, disable_inplace_operators=True, ) # Define the callback function (only once) def swarm_update_callback(array, change_context): # print( # f"Swarm update callback - {self.dm.getLocalSize()}", # flush=True, # ) # Check if this operation may have changed data # Skip expensive operations for read-only sync operations data_changed = change_context.get("data_has_changed", True) if not data_changed: # print( # "Swarm callback: Skipping migration - read-only sync operation" # ) return # Check if sizes match before attempting to copy back petsc_size = self.dm.getLocalSize() points_size = array.shape[0] if petsc_size == points_size: # Update PETSc state # We could do this directly which would be more efficient and bypass the access manager (appropriately, here) self._coord_var.array[:, 0, :] = array[...] # Migrate by default (unless user has disabled it) if not self._migration_disabled: self.migrate() for var in self._vars.values(): var._update() else: # This means a migration call has been made before we have # had a chance to update the swarm consistently. This is an error # condition. We raise an exception to prevent further errors. print( f"Size mismatch: PETSc={petsc_size}, Points={points_size}\n", f"The swarm migration state has become corrupted", ) raise RuntimeError return # Add callback to the cached object self._coords.add_callback(swarm_update_callback) else: # Subsequent accesses: efficiently sync new coordinate data # This preserves callbacks and delay contexts, updating object reference if size # changed as a result of migration operations self._coords = self._coords.sync_data(coords) # Wrap with unit-aware array if mesh has units if hasattr(self.mesh, "units") and self.mesh.units is not None: from underworld3.utilities.unit_aware_array import UnitAwareArray return UnitAwareArray(self._coords, units=self.mesh.units) return self._coords @points.setter def points(self, value): """ Set swarm particle coordinates from physical units. .. deprecated:: 0.99.0 Use swarm variables or direct DM access instead. When the mesh has coordinate scaling applied (via model units), this property automatically converts from physical coordinates to internal model coordinates for PETSc storage. Args: value (numpy.ndarray): Particle coordinates in physical units """ import warnings warnings.warn("swarm.points is deprecated", DeprecationWarning, stacklevel=2) if value.shape[0] != self.local_size: raise TypeError( f"Points must be a numpy array with the same size as the swarm", f" - partial allocation to the swarm may trigger migration or point removal", f" - either change all the swarm points at once or use the `with migration_control()` manager", ) # Apply inverse scaling to convert physical coordinates to model coordinates if hasattr(self.mesh.CoordinateSystem, "_scaled") and self.mesh.CoordinateSystem._scaled: scale_factor = self.mesh.CoordinateSystem._length_scale model_coords = value / scale_factor else: model_coords = value # Update the cached NDArray (triggers callback) - use physical coordinates for cache self._coords[...] = value[...] # Update PETSc DM field directly with model coordinates for immediate consistency coords = self.dm.getField("DMSwarmPIC_coor").reshape((-1, self.dim)) coords[...] = model_coords[...] self.dm.restoreField("DMSwarmPIC_coor") # @points.setter # def points(self, value): # if isinstance(value, np.ndarray): # if value.shape[0] != self.local_size: # message = ( # "Points must be a numpy array with the same size as the swarm." # + "Partial allocation to the swarm may trigger particle migration" # + "either change all the swarm points at once or use the `with migration_disabled()` manager", # ) # raise TypeError(message) # self._coords[...] = value[...] @property def _particle_coordinates(self): return self._coord_var @property def coords(self): """ Swarm particle coordinates in physical units. This is the primary public interface for accessing particle coordinates. Coordinates are automatically converted from internal model units to physical units based on the model's reference quantities. Returns ------- UWQuantity or numpy.ndarray Particle coordinates in physical units with shape (n_particles, dim). If model has reference quantities, returns UWQuantity with appropriate length units. Otherwise returns plain array. Notes ----- - Coordinates are converted from model units to physical units automatically - For internal use with model units, access `swarm._particle_coordinates.data` - Setting coordinates accepts either physical units or plain numbers Examples -------- >>> coords_physical = swarm.coords # Get physical coordinates >>> swarm.coords = new_coords_with_units # Set from physical units See Also -------- swarm.units : Get the unit specification for coordinates """ # Get internal model-unit coordinates model_coords = self._particle_coordinates.data # Convert to physical units if reference quantities are set import underworld3 as uw model = uw.get_default_model() # If no reference quantities, return raw coordinates (model units = physical units) if not model.has_units_active(): return model_coords # Try to convert to physical units; fall back to raw coordinates if # length scale is not defined (e.g., only temperature_diff was set) try: return uw.scaling.dimensionalise(model_coords, uw.units.meter) except ValueError: # Length scale not available - return raw model-unit coordinates return model_coords @coords.setter def coords(self, value): """ Set swarm particle coordinates from physical units. Accepts coordinates with units or plain numbers. If units are provided, they are converted to model units automatically. If plain numbers are provided, they are assumed to be in the correct unit system. Parameters ---------- value : array-like or UWQuantity New coordinates. Can be: - Array with units (e.g., values * uw.units.km) - Plain array (assumed to be in model units or physical units depending on context) """ import underworld3 as uw # Convert physical → non-dimensional units model_coords = uw.scaling.non_dimensionalise(value) # Set internal coordinates self._particle_coordinates.data[...] = model_coords @property def units(self): """ Unit specification for swarm coordinates. Returns the physical unit string for coordinates based on the model's reference quantities. This indicates what units the coordinates are in when accessed via the `coords` property. Returns ------- str or None Unit string for coordinates (e.g., 'kilometer', 'meter'), or None if no reference quantities are set Examples -------- >>> print(swarm.units) # 'kilometer' if length_scale was set in km >>> coords = swarm.coords # Coordinates in kilometers """ # Coordinates have length dimensions import underworld3 as uw model = uw.get_default_model() # Check if model has reference quantities if not hasattr(model, "_pint_registry"): return None # Get length scale from model try: scales = model.get_fundamental_scales() if "length" in scales: length_scale = scales["length"] if hasattr(length_scale, "_pint_qty"): return str(length_scale._pint_qty.units) elif hasattr(length_scale, "units"): return str(length_scale.units) except: pass return None @property def clip_to_mesh(self): """ Whether particles are clipped to remain within mesh boundaries. When True (default), particles that move outside the mesh domain during advection or coordinate updates are removed or repositioned to stay within bounds. When False, particles can exist outside the mesh domain. Returns ------- bool Current clipping state. See Also -------- dont_clip_to_mesh : Context manager to temporarily disable clipping. """ return self._clip_to_mesh @clip_to_mesh.setter def clip_to_mesh(self, value): """Set whether particles should be clipped to mesh boundaries.""" self._clip_to_mesh = bool(value)
[docs] def dont_clip_to_mesh(self): """ Context manager that temporarily disables mesh clipping for the swarm. `swarm.migrate` is called automatically when exiting the context. Usage: with swarm.dont_clip_to_mesh(): # swarm operations that should not be clipped to mesh swarm.data = new_positions """ class _ClipToggleContext: def __init__(self, swarm): self.swarm = swarm self.original_value = None def __enter__(self): self.original_value = self.swarm._clip_to_mesh self.swarm._clip_to_mesh = False return self def __exit__(self, exc_type, exc_val, exc_tb): self.swarm._clip_to_mesh = self.original_value self.swarm.migrate() return _ClipToggleContext(self)
[docs] def migration_disabled(self): """ Legacy context manager that completely disables migration. Use migration_control(disable=True) for new code. Context manager that temporarily disables particle migration for the swarm. Migration is NOT called when exiting the context. Usage: with swarm.migration_disabled(): # swarm operations that should not trigger migration swarm.data = new_positions # ... other operations ... # migrate() will be skipped during these operations """ return self.migration_control(disable=True)
[docs] def migration_control(self, disable=False): """ Context manager to control particle migration behavior. Parameters ---------- disable : bool If False (default), migration is deferred until context exit. If True, migration is completely disabled. Examples -------- Defer migration until end (default):: with swarm.migration_control(): swarm.points[mask1] += delta1 swarm.points[mask2] *= scale # Migration happens HERE on exit Completely disable migration:: with swarm.migration_control(disable=True): # Operations where migration should never happen # No migration on exit """ class _MigrationControlContext: def __init__(self, swarm, disable): self.swarm = swarm self.disable = disable self.original_value = None self.initial_size = None def __enter__(self): self.original_value = self.swarm._migration_disabled self.swarm._migration_disabled = True if not self.disable: self.initial_size = self.swarm.local_size return self def __exit__(self, exc_type, exc_val, exc_tb): self.swarm._migration_disabled = self.original_value # Perform deferred migration if not disabled and not still blocked if not self.disable and not self.swarm._migration_disabled: # Check if particle positions might have changed if self.swarm.local_size == self.initial_size: self.swarm.migrate() for var in self.swarm._vars.values(): var._update() return _MigrationControlContext(self, disable)
@timing.routine_timer_decorator @uw.collective_operation def populate( self, fill_param: Optional[int] = 1, ): """ Populate the swarm with particles throughout the domain. Parameters ---------- fill_param: Parameter determining the particle count per cell (per dimension) for the given layout, using the mesh degree. Raises ------ RuntimeError If the swarm has already been initialized with particles. """ if self.local_size > 0: raise RuntimeError( f"Cannot populate swarm that already has {self.local_size} particles. " "populate() is only for swarm initialization." ) self.fill_param = fill_param newp_coords0 = self.mesh._get_coords_for_basis(fill_param, continuous=False) newp_cells0 = self.mesh.get_closest_local_cells(newp_coords0) valid = newp_cells0 != -1 newp_coords = newp_coords0[valid] newp_cells = newp_cells0[valid] self.dm.finalizeFieldRegister() # PETSc < 3.24 has an off-by-one bug in addNPoints when swarm size is initially zero # It allocates N-1 instead of N, so we add +1 to compensate # PETSc 3.24+ fixed this bug, so we use the exact count from petsc4py import PETSc if PETSc.Sys.getVersion() >= (3, 24, 0): self.dm.addNPoints(newp_coords.shape[0]) else: self.dm.addNPoints(newp_coords.shape[0] + 1) coords = self.dm.getField("DMSwarmPIC_coor").reshape((-1, self.dim)) ranks = self.dm.getField("DMSwarm_rank") coords[...] = newp_coords[...] ranks[...] = uw.mpi.rank self.dm.restoreField("DMSwarmPIC_coor") self.dm.restoreField("DMSwarm_rank") if self.recycle_rate > 1: with self.access(): # This is a mesh-local quantity, so let's just # store it on the mesh in an ad_hoc fashion for now self.mesh.particle_X_orig = self._particle_coordinates.data.copy() with self.access(): swarm_orig_size = self.local_size all_local_coords = np.vstack( (self._particle_coordinates.data,) * (self.recycle_rate) ) swarm_new_size = all_local_coords.shape[0] self.dm.addNPoints(swarm_new_size - swarm_orig_size) coords = self.dm.getField("DMSwarmPIC_coor").reshape((-1, self.dim)) # Compute perturbation - extract magnitude if coordinates have units # numpy.array(..., dtype=float64) forces conversion to plain array coord_data = np.array(all_local_coords, dtype=np.float64) search_lengths = np.array(self.mesh._search_lengths[all_local_cells], dtype=np.float64) perturbation = ( (0.33 / (1 + fill_param)) * (np.random.random(size=coord_data.shape) - 0.5) * 0.00001 * search_lengths # typical cell size ) # Add perturbation (coords array stores dimensionless values) coords[...] = coord_data + perturbation self.dm.restoreField("DMSwarmPIC_coor") ## Now set the cycle values with self.access(self._remeshed): for i in range(0, self.recycle_rate): offset = swarm_orig_size * i self._remeshed.data[offset::, 0] = i # Informational: particle population just changed. self._population_generation += 1 return @timing.routine_timer_decorator @uw.collective_operation def migrate( self, remove_sent_points=True, delete_lost_points=None, max_its=10, ): """ Migrate swarm across processes after coordinates have been updated. The algorithm uses a global kD-tree for the centroids of the domains to decide the particle mpi.rank (send to the closest) If the particles are mis-assigned to a particular mpi.rank, the next choice is the second-closest and so on. A few particles are still not found after this distribution process which probably means they are just outside the mesh. If some points remain lost, they will be deleted if `delete_lost_points` is set. Implementation note: We retained (above) the name `DMSwarmPIC_coor` for the particle field to allow this routine to be inherited by a PIC swarm which has this field pre-defined. (We'd need to add a cellid field as well, and re-compute it upon landing) Note: This is a COLLECTIVE operation - all MPI ranks must call it. """ if self._migration_disabled: return # Informational: migration may move or drop particles. Bump # unconditionally; restore is not gated on this counter so a # conservative no-op bump is harmless. self._population_generation += 1 from time import time if delete_lost_points is None: delete_lost_points = self.clip_to_mesh mesh_domain_kdtree = self.mesh._get_domain_kdtree() # This will only worry about particles that are not already claimed ! # swarm_coord_array = (self.dm.getField("DMSwarmPIC_coor").reshape((-1, self.dim))).copy() self.dm.restoreField("DMSwarmPIC_coor") in_or_not = self.mesh.points_in_domain( swarm_coord_array, ) num_points_in_domain = np.count_nonzero(in_or_not == True) num_points_not_in_domain = np.count_nonzero(in_or_not == False) not_my_points = np.where(in_or_not == False)[0] uw.mpi.barrier() global_unclaimed_points = int( uw.utilities.gather_data(num_points_not_in_domain, bcast=True, dtype=int).sum() ) global_claimed_points = int( uw.utilities.gather_data(num_points_in_domain, bcast=True, dtype=int).sum() ) # Unlikely, but we should check this uw.mpi.barrier() if global_unclaimed_points == 0: return # Migrate particles between processes (if there are more than one of them) if uw.mpi.size > 1: for it in range(0, min(max_its, uw.mpi.size)): # Send unclaimed points to next processor in line swarm_rank_array = self.dm.getField("DMSwarm_rank") swarm_coord_array = self.dm.getField("DMSwarmPIC_coor").reshape(-1, self.dim) if not_my_points.shape[0] > 0: dist, rank = mesh_domain_kdtree.query( swarm_coord_array[not_my_points], k=it + 1, sqr_dists=False ) swarm_rank_array.reshape(-1)[not_my_points] = rank.reshape( -1, it + 1 )[:, it].flatten() self.dm.restoreField("DMSwarm_rank") self.dm.restoreField("DMSwarmPIC_coor") # Now we send the points (basic migration) self.dm.migrate(remove_sent_points=True) uw.mpi.barrier() swarm_coord_array = self.dm.getField("DMSwarmPIC_coor").reshape(-1, self.dim) in_or_not = self.mesh.points_in_domain(swarm_coord_array) self.dm.restoreField("DMSwarmPIC_coor") num_points_in_domain = np.count_nonzero(in_or_not == True) num_points_not_in_domain = np.count_nonzero(in_or_not == False) not_my_points = np.where(in_or_not == False)[0] unclaimed_points_last_iteration = global_unclaimed_points claimed_points_last_iteration = global_claimed_points uw.mpi.barrier() global_unclaimed_points = int( uw.utilities.gather_data( num_points_not_in_domain, bcast=True, dtype=int, ).sum() ) global_claimed_points = int( uw.utilities.gather_data(num_points_in_domain, bcast=True, dtype=int).sum() ) if global_unclaimed_points == 0: break if ( global_unclaimed_points == unclaimed_points_last_iteration and global_claimed_points == claimed_points_last_iteration ): break # Missing points for deletion if required if delete_lost_points: uw.mpi.barrier() if len(not_my_points) > 0: indices = np.sort(not_my_points)[::-1] for index in indices: self.dm.removePointAtIndex(index) # Invalidate all cached data after migration. # Any particle movement (send, receive, or balanced swap) makes # cached arrays stale — both size and values may have changed. self._invalidate_canonical_data() return def _force_migration_after_mesh_change(self): """ Force migration of swarm particles after mesh coordinate changes. This method bypasses the normal migration_disabled check since mesh coordinate changes require swarm particles to be re-distributed regardless of migration disabled state. """ # Temporarily override migration disabled state original_migration_disabled = self._migration_disabled self._migration_disabled = False try: # Disable variable array callbacks during migration to prevent corruption # Collect all variable arrays and disable their callbacks disabled_arrays = [] for var in self._vars.values(): if hasattr(var, "_array_cache") and var._array_cache is not None: var._array_cache.disable_callbacks() disabled_arrays.append(var._array_cache) try: # Perform standard migration self.migrate(remove_sent_points=True, delete_lost_points=True) finally: # Re-enable variable array callbacks for array_cache in disabled_arrays: array_cache.enable_callbacks() finally: # Restore original migration disabled state self._migration_disabled = original_migration_disabled
[docs] @timing.routine_timer_decorator def add_particles_with_coordinates(self, coordinatesArray) -> int: """Add particles at given coordinates, keeping only locally-owned points. Each rank filters the input array and adds only the points that fall within its local domain partition. Non-local points are silently ignored, so it is safe to pass the same global coordinate array to every rank — no duplicates will be created. This is the recommended method for user code. For pre-partitioned data where each rank already holds only its own points, this method also works correctly. Parameters ---------- coordinatesArray : numpy.ndarray Coordinates of new particles, shape ``(n, dim)``. Returns -------- npoints : int Number of points actually added on this rank. """ if not isinstance(coordinatesArray, np.ndarray): raise TypeError("'coordinateArray' must be provided as a numpy array") if not len(coordinatesArray.shape) == 2: raise ValueError("The 'coordinateArray' is expected to be two dimensional.") if not coordinatesArray.shape[1] == self.mesh.dim: #### petsc appears to ignore columns that are greater than the mesh dim, but still worth including raise ValueError( """The 'coordinateArray' must have shape n*dim, where 'n' is the number of particles to add, and 'dim' is the dimensionality of the supporting mesh ({}).""".format( self.mesh.dim ) ) valid = self.mesh.points_in_domain(coordinatesArray, strict_validation=True) valid_coordinates = coordinatesArray[valid] npoints = len(valid_coordinates) swarm_size = self.dm.getLocalSize() # -1 means no particles have been added yet (PETSc interface change) if swarm_size == -1: swarm_size = 0 # PETSc < 3.24 has an off-by-one bug in addNPoints when swarm size is initially zero # It allocates N-1 instead of N, so we add +1 to compensate # PETSc 3.24+ fixed this bug, so we use the exact count from petsc4py import PETSc if PETSc.Sys.getVersion() < (3, 24, 0): npoints = npoints + 1 self.dm.finalizeFieldRegister() self.dm.addNPoints(npoints=npoints) if npoints > 0: coords = self.dm.getField("DMSwarmPIC_coor").reshape((-1, self.dim)) ranks = self.dm.getField("DMSwarm_rank") coords[swarm_size::, :] = valid_coordinates[:, :] ranks[swarm_size::] = uw.mpi.rank self.dm.restoreField("DMSwarm_rank") self.dm.restoreField("DMSwarmPIC_coor") # Here we update the swarm cycle values as required if self.recycle_rate > 1: with self.access(self._remeshed): # self._Xorig.data[...] = coordinatesArray self._remeshed.data[...] = 0 self.dm.migrate(remove_sent_points=True) # Invalidate cached data — particle count changed after addNPoints + migrate self._particle_coordinates._canonical_data = None for var in self._vars.values(): if hasattr(var, "_canonical_data"): var._canonical_data = None # Informational: addNPoints + direct dm.migrate path doesn't go # through Swarm.migrate, so bump explicitly. self._population_generation += 1 return npoints
[docs] @timing.routine_timer_decorator def add_particles_with_global_coordinates( self, globalCoordinatesArray, migrate=True, delete_lost_points=True, ) -> int: """Add particles on every rank, then migrate to correct owners. Every rank inserts **all** supplied points into the local swarm, then ``dm.migrate()`` moves each particle to the rank that owns its spatial location. If the same array is passed on every rank, this produces one copy of each point in the correct partition — but calling it with *different* arrays per rank will accumulate all of them. This is primarily an internal method used by global-evaluation and mesh-transfer utilities. For general use, prefer :meth:`add_particles_with_coordinates`, which filters non-local points automatically and never creates duplicates. Parameters ---------- globalCoordinatesArray : numpy.ndarray Coordinates of new particles, shape ``(n, dim)``. migrate : bool Run PETSc swarm migration after insertion (default True). delete_lost_points : bool Remove particles that fall outside any rank's domain during migration (default True). Returns -------- npoints : int Number of points added on this rank before migration. """ if not isinstance(globalCoordinatesArray, np.ndarray): raise TypeError("'coordinateArray' must be provided as a numpy array") if not len(globalCoordinatesArray.shape) == 2: raise ValueError("The 'coordinateArray' is expected to be two dimensional.") if not globalCoordinatesArray.shape[1] == self.mesh.dim: #### petsc appears to ignore columns that are greater than the mesh dim, but still worth including raise ValueError( """The 'coordinateArray' must have shape n*dim, where 'n' is the number of particles to add, and 'dim' is the dimensionality of the supporting mesh ({}).""".format( self.mesh.dim ) ) npoints = len(globalCoordinatesArray) swarm_size = self.dm.getLocalSize() # -1 means no particles have been added yet if swarm_size == -1: swarm_size = 0 # PETSc < 3.24 has an off-by-one bug in addNPoints when swarm size is initially zero # It allocates N-1 instead of N, so we add +1 to compensate # PETSc 3.24+ fixed this bug, so we use the exact count from petsc4py import PETSc if PETSc.Sys.getVersion() < (3, 24, 0): npoints = npoints + 1 self.dm.finalizeFieldRegister() self.dm.addNPoints(npoints=npoints) # Informational: population changed even if migrate=False is # passed (in which case Swarm.migrate's bump wouldn't fire). self._population_generation += 1 # Add new points with provided coords # Record the current rank (migration needs to know where we start from !) coords = self.dm.getField("DMSwarmPIC_coor").reshape((-1, self.dim)) ranks = self.dm.getField("DMSwarm_rank") coords[swarm_size::, :] = globalCoordinatesArray[:, :] ranks[swarm_size::] = uw.mpi.rank self.dm.restoreField("DMSwarm_rank") self.dm.restoreField("DMSwarmPIC_coor") # Here we update the swarm cycle values as required if self.recycle_rate > 1: with self.access(self._remeshed): # self._Xorig.data[...] = globalCoordinatesArray self._remeshed.data[...] = 0 if migrate: self.migrate(remove_sent_points=True, delete_lost_points=delete_lost_points) return npoints
[docs] @timing.routine_timer_decorator def save( self, filename: int, compression: Optional[bool] = False, compressionType: Optional[str] = "gzip", force_sequential=False, ): """ Save the swarm coordinates to a h5 file. Parameters ---------- filename : The filename of the swarm checkpoint file to save to disk. compression : Add compression to the h5 files (saves space but increases write times with increasing no. of processors) compressionType : Type of compression to use, 'gzip' and 'lzf' supported. 'gzip' is default. Compression also needs to be set to 'True'. """ if h5py.h5.get_config().mpi == False and comm.size > 1 and comm.rank == 0: warnings.warn( "Collective IO not possible as h5py not available in parallel mode. Switching to sequential. This will be slow for models running on multiple processors", stacklevel=2, ) if filename.endswith(".h5") == False: raise RuntimeError("The filename must end with .h5") if compression == True and comm.rank == 0: warnings.warn("Compression may slow down write times", stacklevel=2) if h5py.h5.get_config().mpi == True and not force_sequential: # BUGFIX(#151): the previous parallel path called # h5f.create_dataset("coordinates", data=points_data_copy) # collectively, but each rank passed its own local-sized array. # In parallel HDF5 every rank must specify the *same* dataset # shape on a collective create_dataset; passing different shapes # leaves HDF5's internal metadata inconsistent so the collective # close never synchronises, producing a silent hang. # # Fix: allgather the per-rank sizes, create the dataset at the # global shape, then each rank writes its own slice. points_data_copy = self._particle_coordinates.data[:].copy() local_n = points_data_copy.shape[0] cdim = points_data_copy.shape[1] sizes = comm.allgather(local_n) total_n = sum(sizes) offset = sum(sizes[: comm.rank]) with h5py.File(f"{filename[:-3]}.h5", "w", driver="mpio", comm=comm) as h5f: if compression == True: dset = h5f.create_dataset( "coordinates", shape=(total_n, cdim), dtype=points_data_copy.dtype, chunks=True, compression=compressionType, ) else: dset = h5f.create_dataset( "coordinates", shape=(total_n, cdim), dtype=points_data_copy.dtype, ) if local_n > 0: dset[offset : offset + local_n] = points_data_copy del points_data_copy else: # Sequential fallback: rank 0 creates the file and writes its slab, # then each higher rank appends in turn. points_data_copy = self.points[:].copy() local_n = points_data_copy.shape[0] if comm.rank == 0: with h5py.File(f"{filename[:-3]}.h5", "w") as h5f: if compression == True: h5f.create_dataset( "coordinates", data=points_data_copy, chunks=True, maxshape=(None, points_data_copy.shape[1]), compression=compressionType, ) else: h5f.create_dataset( "coordinates", data=points_data_copy, chunks=True, maxshape=(None, points_data_copy.shape[1]), ) comm.barrier() for i in range(1, comm.size): if comm.rank == i and local_n > 0: # BUGFIX(#151): the previous version referenced an undefined # ``data_copy`` here; passive swarms with a zero-particle # rank would have raised NameError. Use the local # ``points_data_copy`` we already have. with h5py.File(f"{filename[:-3]}.h5", "a") as h5f: existing_size = h5f["coordinates"].shape[0] h5f["coordinates"].resize((existing_size + local_n), axis=0) h5f["coordinates"][existing_size:] = points_data_copy comm.barrier() del points_data_copy ## Add swarm coordinate unit metadata to the file import json # Use preferred selective_ranks pattern for coordinate metadata with uw.selective_ranks(0) as should_execute: if should_execute: with h5py.File(f"{filename[:-3]}.h5", "a") as h5f: # Add swarm coordinate unit metadata swarm_coord_metadata = { "coordinate_units": ( str(self.coordinate_units) if hasattr(self, "coordinate_units") else None ), "coordinate_dimensionality": ( str(self.coordinate_dimensionality) if hasattr(self, "coordinate_dimensionality") else None ), "swarm_type": type(self).__name__, "mesh_type": type(self.mesh).__name__ if hasattr(self, "mesh") else None, "dimension": self.dim, } # Store in coordinates dataset attributes if "coordinates" in h5f: h5f["coordinates"].attrs["swarm_metadata"] = json.dumps( swarm_coord_metadata ) return
[docs] @timing.routine_timer_decorator def read_timestep( self, base_filename: str, swarm_id: str, index: int, outputPath: Optional[str] = "", migrate=True, ): output_base_name = os.path.join(outputPath, base_filename) swarm_file = output_base_name + f".{swarm_id}.{index:05}.h5" ### open up file with coords on all procs with h5py.File(f"{swarm_file}", "r") as h5f: coordinates = h5f["coordinates"][:] # We make it possible not to migrate the swarm because this # will also delete points outside the mesh. We may not want to do # that (either for debugging / visualisation, or when adapting the mesh) self.add_particles_with_global_coordinates(coordinates, migrate=migrate) return
[docs] @timing.routine_timer_decorator def add_variable( self, name, size=1, dtype=float, proxy_degree=2, _nn_proxy=False, units=None, ): """ Add a variable to the swarm. Variables must be created before the swarm is populated with particles. Once swarm.populate() or similar methods are called, PETSc finalizes field registration and no new variables can be added. Parameters ---------- name : str Variable name size : int, default 1 Number of components (1 for scalar, 2-3 for vector, etc.) dtype : type, default float Data type (float or int) proxy_degree : int, default 2 Degree for mesh proxy variable interpolation _nn_proxy : bool, default False Internal parameter for nearest-neighbor proxy units : str, optional Physical units for this variable (e.g., "kg/m^3", "m/s") Returns ------- SwarmVariable The created swarm variable Raises ------ RuntimeError If swarm is already populated with particles Examples -------- Correct usage: >>> swarm = uw.swarm.Swarm(mesh) >>> material = swarm.add_variable("material", 1, dtype=int) >>> temperature = swarm.add_variable("temperature", 1) >>> swarm.populate(fill_param=3) # Populate after creating variables Incorrect usage (will raise error): >>> swarm = uw.swarm.Swarm(mesh) >>> swarm.populate(fill_param=3) >>> material = swarm.add_variable("material", 1) # ERROR! """ # Check early to provide a clear error message if self.local_size > 0: raise RuntimeError( f"Cannot add variable '{name}' to swarm: swarm is already populated " f"with {self.local_size} particles. Variables must be created " f"before calling swarm.populate() or any other operation that adds particles.\n" f"\nCorrect usage:\n" f" swarm = uw.swarm.Swarm(mesh)\n" f" variable = swarm.add_variable('{name}', {size}) # Create variables first\n" f" swarm.populate(fill_param=3) # Then populate with particles" ) return SwarmVariable( name, self, size, dtype=dtype, proxy_degree=proxy_degree, _nn_proxy=_nn_proxy, units=units, )
[docs] @timing.routine_timer_decorator def petsc_save_checkpoint( self, swarmName: str, index: int, outputPath: Optional[str] = "", ): """ Use PETSc to save the swarm and attached data to a .pbin and xdmf file. Parameters ---------- swarmName : Name of the swarm to save. index : An index which might correspond to the timestep or output number (for example). outputPath : Path to save the data. If left empty it will save the data in the current working directory. """ x_swarm_fname = f"{outputPath}{swarmName}_{index:05d}.xmf" self.dm.viewXDMF(x_swarm_fname)
[docs] @timing.routine_timer_decorator def write_timestep( self, filename: str, swarmname: str, index: int, swarmVars: Optional[list] = None, outputPath: Optional[str] = "", time: Optional[int] = None, compression: Optional[bool] = False, compressionType: Optional[str] = "gzip", force_sequential: Optional[bool] = False, ): """ Save data to h5 and a corresponding xdmf for visualisation using h5py. Parameters ---------- swarmName : Name of the swarm to save. swarmVars : List of swarm objects to save. index : An index which might correspond to the timestep or output number (for example). outputPath : Path to save the data. If left empty it will save the data in the current working directory. time : Attach the time to the generated xdmf. compression : Whether to compress the h5 files [bool]. compressionType : The type of compression to use. 'gzip' and 'lzf' are the supported types, with 'gzip' as the default. """ # This will eliminate the issue of whether or not to put path separators in the # outputPath. Also does the right thing if outputPath is "" output_base_name = os.path.join(outputPath, filename) + "." + swarmname # check the directory where we will write checkpoint dir_path = os.path.dirname(output_base_name) # get directory # check if path exists if os.path.exists(os.path.abspath(dir_path)): # easier to debug abs pass else: raise RuntimeError(f"{os.path.abspath(dir_path)} does not exist") # check if we have write access if os.access(os.path.abspath(dir_path), os.W_OK): pass else: raise RuntimeError(f"No write access to {os.path.abspath(dir_path)}") # could also try to coerce this to be a list and raise if it fails (tuple, singleton ... ) # also ... why the typechecking if this can still happen if swarmVars is not None and not isinstance(swarmVars, list): raise RuntimeError("`swarmVars` does not appear to be a list.") else: ### save the swarm particle location self.save( filename=f"{output_base_name}.{index:05d}.h5", compression=compression, compressionType=compressionType, force_sequential=force_sequential, ) #### Generate a h5 file for each field if swarmVars != None: for field in swarmVars: field.save( filename=f"{output_base_name}.{field.name}.{index:05d}.h5", compression=compression, compressionType=compressionType, force_sequential=force_sequential, ) if uw.mpi.rank == 0: ### only need to combine the h5 files to a single xdmf on one proc with open(f"{output_base_name}.{index:05d}.xdmf", "w") as xdmf: # Write the XDMF header xdmf.write('<?xml version="1.0" ?>\n') xdmf.write('<Xdmf xmlns:xi="http://www.w3.org/2001/XInclude" Version="2.0">\n') xdmf.write("<Domain>\n") xdmf.write(f'<Grid Name="{output_base_name}.{index:05d}" GridType="Uniform">\n') if time != None: xdmf.write(f' <Time Value="{time}" />\n') # Write the grid element for the HDF5 dataset with h5py.File(f"{output_base_name}.{index:05}.h5", "r") as h5f: xdmf.write( f' <Topology Type="POLYVERTEX" NodesPerElement="{h5f["coordinates"].shape[0]}"> </Topology>\n' ) if h5f["coordinates"].shape[1] == 2: xdmf.write(' <Geometry Type="XY">\n') elif h5f["coordinates"].shape[1] == 3: xdmf.write(' <Geometry Type="XYZ">\n') xdmf.write( f' <DataItem Format="HDF" NumberType="Float" Precision="8" Dimensions="{h5f["coordinates"].shape[0]} {h5f["coordinates"].shape[1]}">{os.path.basename(h5f.filename)}:/coordinates</DataItem>\n' ) xdmf.write(" </Geometry>\n") # Write the attribute element for the field if swarmVars != None: for field in swarmVars: with h5py.File( f"{output_base_name}.{field.name}.{index:05d}.h5", "r" ) as h5f: if h5f["data"].dtype == np.int32: xdmf.write( f' <Attribute Type="Scalar" Center="Node" Name="{field.name}">\n' ) xdmf.write( f' <DataItem Format="HDF" NumberType="Int" Precision="4" Dimensions="{h5f["data"].shape[0]} {h5f["data"].shape[1]}">{os.path.basename(h5f.filename)}:/data</DataItem>\n' ) elif h5f["data"].shape[1] == 1: xdmf.write( f' <Attribute Type="Scalar" Center="Node" Name="{field.name}">\n' ) xdmf.write( f' <DataItem Format="HDF" NumberType="Float" Precision="8" Dimensions="{h5f["data"].shape[0]} {h5f["data"].shape[1]}">{os.path.basename(h5f.filename)}:/data</DataItem>\n' ) elif h5f["data"].shape[1] == 2 or h5f["data"].shape[1] == 3: xdmf.write( f' <Attribute Type="Vector" Center="Node" Name="{field.name}">\n' ) xdmf.write( f' <DataItem Format="HDF" NumberType="Float" Precision="8" Dimensions="{h5f["data"].shape[0]} {h5f["data"].shape[1]}">{os.path.basename(h5f.filename)}:/data</DataItem>\n' ) else: xdmf.write( f' <Attribute Type="Tensor" Center="Node" Name="{field.name}">\n' ) xdmf.write( f' <DataItem Format="HDF" NumberType="Float" Precision="8" Dimensions="{h5f["data"].shape[0]} {h5f["data"].shape[1]}">{os.path.basename(h5f.filename)}:/data</DataItem>\n' ) xdmf.write(" </Attribute>\n") else: pass # Write the XDMF footer xdmf.write("</Grid>\n") xdmf.write("</Domain>\n") xdmf.write("</Xdmf>\n")
@property def vars(self): r"""List of SwarmVariables attached to this swarm. Returns ------- list List of :class:`SwarmVariable` objects defined on this swarm. """ return self._vars # ----- Unitary snapshot / restore ----- # # See ``src/underworld3/checkpoint/snapshot.py`` and # ``docs/developer/design/in_memory_checkpoint_design.md``. Capture # records the per-rank particle layout and user-variable arrays. # Restore rebuilds the local population from the snapshot rather # than refusing on counter mismatch — restore is precisely for the # case where particles have moved / migrated / been repopulated. def _snapshot_stable_name(self) -> str: """Per-process stable name. ``instance_number`` comes from uw_object.""" return f"swarm_{self.instance_number}"
[docs] def snapshot_payload(self) -> dict: """Return a self-contained dict describing this swarm's state. Captured: per-rank particle coordinates (from ``DMSwarmPIC_coor``) and every user swarm-variable's data array. PETSc-internal variables (``DMSwarmPIC_coor``, ``DMSwarm_X0``, ``DMSwarm_remeshed``) are excluded — their contents either come from the captured coords or are regenerated on the next solve. """ coord_field = self.dm.getField("DMSwarmPIC_coor").reshape( (-1, self.dim) ) coords = np.asarray(coord_field).copy() self.dm.restoreField("DMSwarmPIC_coor") var_arrays: dict = {} for var in list(self._vars.values()): if var.name.startswith("DMSwarm"): continue var_arrays[var.clean_name] = np.asarray(var.data).copy() return { "name": self._snapshot_stable_name(), "mesh_name": self.mesh.name, "population_generation": int(self._population_generation), "coords": coords, "vars": var_arrays, }
[docs] def apply_snapshot_payload(self, payload: dict) -> None: """Rebuild this swarm's local particle population from a payload. Algorithm: 1. Drop every current local particle (``dm.removePoint`` from the end is O(1) per call, O(N) total). 2. Add the captured-rank's particles back via the raw PETSc primitives — ``addNPoints`` then writing the coord field directly. We deliberately bypass :meth:`add_particles_with_coordinates` because that method filters via ``points_in_domain`` (slow) and triggers ``dm.migrate`` (unnecessary — saved coords were already local at capture time, and the mesh hasn't changed). 3. Write captured per-variable data back. The local particle count matches the captured count because we just put the same particles back in the same order. This bumps ``_population_generation`` once (from the addNPoints step in restore), which is correct: the population *did* just change. Downstream consumers that care can compare against the captured value in ``payload['captured_population_generation']``. """ from underworld3.checkpoint.snapshot import SnapshotInvalidatedError saved_coords = np.asarray(payload["coords"]) # Step 1: clear local population. removePoint() removes the last # particle, so this is O(N) total. while self.dm.getLocalSize() > 0: self.dm.removePoint() # Step 2: re-add. add raw points, write coords + ranks directly. n_saved = int(saved_coords.shape[0]) if n_saved > 0: self.dm.finalizeFieldRegister() self.dm.addNPoints(npoints=n_saved) coord_field = self.dm.getField("DMSwarmPIC_coor").reshape( (-1, self.dim) ) if coord_field.shape != saved_coords.shape: self.dm.restoreField("DMSwarmPIC_coor") raise SnapshotInvalidatedError( f"swarm {self._snapshot_stable_name()!r}: after " f"addNPoints({n_saved}) the coord field has shape " f"{coord_field.shape}, expected {saved_coords.shape}" ) coord_field[...] = saved_coords self.dm.restoreField("DMSwarmPIC_coor") rank_field = self.dm.getField("DMSwarm_rank") rank_field[...] = uw.mpi.rank self.dm.restoreField("DMSwarm_rank") # Invalidate canonical-data caches — the underlying arrays # have been reallocated by the addNPoints path. if hasattr(self._particle_coordinates, "_canonical_data"): self._particle_coordinates._canonical_data = None for var in self._vars.values(): if hasattr(var, "_canonical_data"): var._canonical_data = None # The raw PETSc primitives used above (removePoint loop + # addNPoints + direct field writes) deliberately bypass # Swarm.migrate / add_particles_with_coordinates, so they do # not touch _population_generation. Bump it explicitly here # for consistency with the other mutation sites — a restore # IS a population change, downstream consumers should see it. # (Comment rewritten per Copilot review on #195.) self._population_generation += 1 # Step 3: write captured per-variable data. Per Copilot # review on #195, also raise on the inverse direction — # any user swarm variable on the LIVE swarm that wasn't in # the snapshot would retain stale/uninitialised contents # after the clear+addNPoints reallocation, which is exactly # the silent-incoherence failure we want loud rather than # quiet. The contract is symmetric with the mesh-variable # restore: same variable set on both sides. current_user_vars = { var.clean_name: var for var in self._vars.values() if not var.name.startswith("DMSwarm") } captured_names = set(payload["vars"].keys()) live_names = set(current_user_vars.keys()) extras = live_names - captured_names if extras: raise SnapshotInvalidatedError( f"swarm {self._snapshot_stable_name()!r}: variables " f"{sorted(extras)!r} exist on the live swarm but were " f"not in the snapshot. Restore would leave them with " f"incoherent data after the population rebuild — add " f"them before the snapshot was taken, or remove them " f"before restoring." ) for var_clean_name, saved in payload["vars"].items(): var = current_user_vars.get(var_clean_name) if var is None: raise SnapshotInvalidatedError( f"swarm {self._snapshot_stable_name()!r}: variable " f"{var_clean_name!r} from snapshot is not present" ) current = np.asarray(var.data) if current.shape != saved.shape: raise SnapshotInvalidatedError( f"swarm {self._snapshot_stable_name()!r}: variable " f"{var_clean_name!r} data shape mismatch — current " f"{current.shape} vs snapshot {saved.shape}" ) current[...] = saved
def _legacy_access(self, *writeable_vars: SwarmVariable): """ This context manager makes the underlying swarm variables data available to the user. The data should be accessed via the variables `data` handle. As default, all data is read-only. To enable writeable data, the user should specify which variable they wish to modify. At the conclusion of the users context managed block, numerous further operations will be automatically executed. This includes swarm parallel migration routines where the swarm's `particle_coordinates` variable has been modified. The swarm variable proxy mesh variables will also be updated for modifed swarm variables. Parameters ---------- writeable_vars The variables for which data write access is required. Example ------- >>> import underworld3 as uw >>> someMesh = uw.discretisation.FeMesh_Cartesian() >>> with someMesh._deform_mesh(): ... someMesh.data[0] = [0.1,0.1] >>> someMesh.data[0] array([ 0.1, 0.1]) """ import time uw.timing._incrementDepth() stime = time.time() deaccess_list = [] for var in self._vars.values(): # if already accessed within higher level context manager, continue. if var._is_accessed == True: continue # set flag so variable status can be known elsewhere var._is_accessed = True # add to de-access list to rewind this later deaccess_list.append(var) # grab numpy object, setting read only if necessary var._data = self.dm.getField(var.clean_name).reshape((-1, var.num_components)) assert var._data is not None if var not in writeable_vars: var._old_data_flag = var._data.flags.writeable var._data.flags.writeable = False else: # increment variable state var._increment() # make *view* for each var component if var._proxy: for i in range(0, var.shape[0]): for j in range(0, var.shape[1]): var._data_container[i, j] = var._data_container[i, j]._replace( data=var._data[:, var._data_layout(i, j)], ) # if particles moving, update swarm state if self._particle_coordinates in writeable_vars: self._increment() # Create a class which specifies the required context # manager hooks (`__enter__`, `__exit__`). class exit_manager: def __init__(self, swarm): self.em_swarm = swarm def __enter__(self): pass def __exit__(self, *args): for var in self.em_swarm.vars.values(): # only de-access variables we have set access for. if var not in deaccess_list: continue # set this back, although possibly not required. if var not in writeable_vars: var._data.flags.writeable = var._old_data_flag var._data = None self.em_swarm.dm.restoreField(var.clean_name) var._is_accessed = False # do particle migration if coords changes if self.em_swarm._particle_coordinates in writeable_vars: # let's use the mesh index to update the particles owning cells. # note that the `petsc4py` interface is more convenient here as the # `SwarmVariable.data` interface is controlled by the context manager # that we are currently within, and it is therefore too easy to # get things wrong that way. # # # if uw.mpi.size > 1: # coords = self.em_swarm.dm.getField("DMSwarmPIC_coor").reshape( # (-1, self.em_swarm.dim) # ) # self.em_swarm.dm.restoreField("DMSwarmPIC_coor") # ## We'll need to identify the new processes here and update the particle rank value accordingly # # Even if only on one process, migrate needs to be called to remove particles that are # not in the domain. self.em_swarm.migrate( remove_sent_points=True, delete_lost_points=self.em_swarm._clip_to_mesh, ) # void these things too self.em_swarm._index = None self.em_swarm._nnmapdict = {} # do var updates for var in self.em_swarm.vars.values(): # if swarm migrated, update all. # if var updated, update var. if (self.em_swarm._particle_coordinates in writeable_vars) or ( var in writeable_vars ): var._update() if var._proxy: for i in range(0, var.shape[0]): for j in range(0, var.shape[1]): # var._data_ij[i, j] = None var._data_container[i, j] = var._data_container[i, j]._replace( data=f"SwarmVariable[...].data is only available within mesh.access() context", ) uw.timing._decrementDepth() uw.timing.log_result(time.time() - stime, "Swarm.access", 1) return exit_manager(self)
[docs] def access(self, *writeable_vars: SwarmVariable): """ Dummy access manager that provides deferred sync for backward compatibility. Uses NDArray_With_Callback.delay_callbacks_global() internally. This is a compatibility wrapper that allows existing code using the access() context manager to work with the new direct-access variable interfaces. All variable modifications are deferred and synchronized at context exit. Parameters ---------- writeable_vars Variables that will be modified (ignored - all variables are writable with the new interface, this parameter is kept for API compatibility) Returns ------- Context manager that defers variable synchronization until exit Notes ----- This method is deprecated. New code should access variable.data or variable.array directly without requiring an access context. """ import underworld3.utilities class DummyAccessContext: def __init__(self, swarm, writeable_vars): self.swarm = swarm self.writeable_vars = writeable_vars self.delay_context = None def __enter__(self): # Use NDArray_With_Callback global delay context for deferred sync self.delay_context = ( underworld3.utilities.NDArray_With_Callback.delay_callbacks_global( "swarm.access compatibility" ) ) return self.delay_context.__enter__() def __exit__(self, exc_type, exc_val, exc_tb): # This triggers all accumulated callbacks from all variables if self.delay_context: return self.delay_context.__exit__(exc_type, exc_val, exc_tb) return False return DummyAccessContext(self, writeable_vars)
## Better to have one master copy - this one is cut'n'pasted from ## the MeshVariable class def _data_layout(self, i, j=None): # mapping if self.vtype == uw.VarType.SCALAR: return 0 if self.vtype == uw.VarType.VECTOR: if j is None: return i elif i == 0: return j else: raise IndexError(f"Vectors have shape {self.mesh.dim} or {(1, self.mesh.dim)} ") if self.vtype == uw.VarType.TENSOR: if self.mesh.dim == 2: return ((0, 1), (2, 3))[i][j] else: return ((0, 1, 2), (3, 4, 5), (6, 7, 8))[i][j] if self.vtype == uw.VarType.SYM_TENSOR: if self.mesh.dim == 2: return ((0, 2), (2, 1))[i][j] else: return ((0, 3, 4), (3, 1, 5), (4, 5, 2))[i][j] if self.vtype == uw.VarType.MATRIX: return i + j * self.shape[0] ## Check this - the interface to kdtree has changed, are we picking the correct field ? @timing.routine_timer_decorator def _get_map(self, var): # generate tree if not avaiable kd = self._get_kdtree() # get or generate map meshvar_coords = var._meshVar.coords # we can't use numpy arrays directly as keys in python dicts, so # we'll use `xxhash` to generate a hash of array. # this shouldn't be an issue performance wise but we should test to be # sufficiently confident of this. import xxhash h = xxhash.xxh64() h.update(meshvar_coords) digest = h.intdigest() if digest not in self._nnmapdict: self._nnmapdict[digest] = kd.query(meshvar_coords, k=1, sqr_dists=False)[1] return self._nnmapdict[digest]
[docs] @timing.routine_timer_decorator def advection( self, V_fn, delta_t, order=2, corrector=False, restore_points_to_domain_func=None, evalf=False, step_limit=False, ): # Convert delta_t to model units if it has units # This ensures consistent arithmetic: velocity is in model units, so time must be too import underworld3 as uw delta_t_model = uw.scaling.non_dimensionalise(delta_t) dt_limit = self.estimate_dt(V_fn) if step_limit and dt_limit is not None: substeps = int(max(1, round(abs(delta_t_model) / dt_limit))) else: substeps = 1 if uw.mpi.rank == 0 and self.verbose: print(f"Substepping {substeps} / {abs(delta_t) / dt_limit}, {delta_t} ") # X0 holds the particle location at the start of advection # This is needed because the particles may be migrated off-proc # during timestepping. Probably not needed - use global evaluation instead X0 = self._X0 V_fn_matrix = self.mesh.vector.to_matrix(V_fn) # Use current velocity to estimate where the particles would have # landed in an implicit step. WE CANT DO THIS WITH SUB-STEPPING unless # We have a lot more information about the previous launch point / timestep # Also: how does this interact with the particle restoration function ? # if corrector == True and not self._X0_uninitialised: # with self.access(self._particle_coordinates): # v_at_Vpts = np.zeros_like(self.data) # if evalf: # for d in range(self.dim): # v_at_Vpts[:, d] = uw.function.evalf( # V_fn_matrix[d], self.data # ).reshape(-1) # else: # for d in range(self.dim): # v_at_Vpts[:, d] = uw.function.evaluate( # V_fn_matrix[d], self.data # ).reshape(-1) # corrected_position = X0.data.copy() + delta_t * v_at_Vpts # if restore_points_to_domain_func is not None: # corrected_position = restore_points_to_domain_func( # corrected_position # ) # updated_current_coords = 0.5 * (corrected_position + self.data.copy()) # # validate_coords to ensure they live within the domain (or there will be trouble) # if restore_points_to_domain_func is not None: # updated_current_coords = restore_points_to_domain_func( # updated_current_coords # ) # self.data[...] = updated_current_coords[...] # del updated_current_coords # del v_at_Vpts # Wrap this whole thing in sub-stepping loop for step in range(0, substeps): X0.array[:, 0, :] = self._particle_coordinates.data[...] # Mid point algorithm (2nd order) if order == 2: print(f"Advection (2nd): {self.local_size} - swarm points", flush=True) # Use internal model-unit coordinates directly (no conversion needed) v_at_Vpts = np.zeros_like(self._particle_coordinates.data[...]) # First evaluate the velocity at the particle locations # (this is a local operation) v_at_Vpts[...] = uw.function.evaluate(V_fn_matrix, self._particle_coordinates.data)[ :, 0, : ] mid_pt_coords = ( self._particle_coordinates.data[...] + 0.5 * delta_t_model * v_at_Vpts / substeps ) # This will re-position particles in periodic domains (etc) if self.mesh.return_coords_to_bounds is not None: mid_pt_coords = self.mesh.return_coords_to_bounds(mid_pt_coords) # Now do a **Global** evaluation # (since the mid-points might have moved off-proc) # v_at_Vpts[...] = uw.function.global_evaluate(V_fn_matrix, mid_pt_coords)[:, 0, :] new_coords = X0.array[:, 0, :] + delta_t_model * v_at_Vpts / substeps if self.mesh.return_coords_to_bounds is not None: new_coords = self.mesh.return_coords_to_bounds(new_coords) # Set the new particle positions (and automatically migrate) self._particle_coordinates.data[...] = new_coords[...] del new_coords del v_at_Vpts # forward Euler (1st order) else: coords = self._particle_coordinates.data print( f"1. Advection (1st): {coords.shape} v {self.local_size} - swarm point shape", flush=True, ) v_at_Vpts = np.zeros_like(coords) v_at_Vpts[...] = uw.function.global_evaluate(V_fn_matrix, coords[...])[:, 0, :] print( f"2. Advection (1st): {coords.shape} v {self.local_size} - swarm point shape", flush=True, ) new_coords = coords[...] + delta_t_model * v_at_Vpts / substeps print( f"3. Advection (1st): {coords.shape} v {self.local_size} - swarm point shape", flush=True, ) if self.mesh.return_coords_to_bounds is not None: new_coords = self.mesh.return_coords_to_bounds(new_coords) self._particle_coordinates.data[...] = new_coords[...] ## End of substepping loop ## Cycling of the swarm is a cheap and cheerful version of population control for particles. It turns the ## swarm into a streak-swarm where particles are Lagrangian for a number of steps and then reset to their ## original location. if self.recycle_rate > 1: # Restore particles which have cycle == cycle rate (use >= just in case) # Remove remesh points and recreate a new set at the mesh-local # locations that we already have stored. with self.access(self._particle_coordinates, self._remeshed): remeshed = self._remeshed.data[:, 0] == 0 # This is one way to do it ... we can do this better though self.data[remeshed, 0] = 1.0e100 swarm_size = self.dm.getLocalSize() num_remeshed_points = self.mesh.particle_X_orig.shape[0] self.dm.addNPoints(num_remeshed_points) # Informational: remesh just re-injected particles. self._population_generation += 1 ## cellid = self.dm.getField("DMSwarm_cellid") coords = self.dm.getField("DMSwarmPIC_coor").reshape((-1, self.dim)) rmsh = self.dm.getField("DMSwarm_remeshed") # print(f"cellid -> {cellid.shape}") # print(f"particle coords -> {coords.shape}") # print(f"remeshed points -> {num_remeshed_points}") # Compute perturbation - extract magnitude if coordinates have units # numpy.array(..., dtype=float64) forces conversion to plain array coord_data = np.array(self.mesh.particle_X_orig[:, :], dtype=np.float64) radii_data = np.array(self.mesh._radii[cellid[swarm_size::]], dtype=np.float64) perturbation = 0.00001 * ( (0.33 / (1 + self.fill_param)) * (np.random.random(size=(num_remeshed_points, self.dim)) - 0.5) * radii_data.reshape(-1, 1) ) # Add perturbation (coords array stores dimensionless values) coords[swarm_size::] = coord_data + perturbation ## cellid[swarm_size::] = self.mesh.particle_CellID_orig[:, 0] rmsh[swarm_size::] = 0 # self.dm.restoreField("DMSwarm_cellid") self.dm.restoreField("DMSwarmPIC_coor") self.dm.restoreField("DMSwarm_remeshed") # when we let this go, the particles may be re-distributed to # other processors, and we will need to rebuild the remeshed # array before trying to compute / assign values to variables for swarmVar in self.vars.values(): if swarmVar._rebuild_on_cycle: with self.access(swarmVar): if swarmVar.dtype is int: nnn = 1 else: nnn = self.mesh.dim + 1 # 3 for triangles, 4 for tets ... interpolated_values = ( swarmVar.rbf_interpolate(self.mesh.particle_X_orig, nnn=nnn) # swarmVar._meshVar.fn, self.mesh.particle_X_orig # ) ).astype(swarmVar.dtype) swarmVar.data[swarm_size::] = interpolated_values ## ## Determine RANK ## # Migrate will already have been called by the access manager. # Maybe we should hash the local particle coords to make this # a little more user-friendly # self.dm.migrate(remove_sent_points=True) with self.access(self._remeshed): self._remeshed.data[...] = np.mod(self._remeshed.data[...] - 1, self.recycle_rate) self.cycle += 1 ## End of cycle_swarm loop # # # Re-route particles to their owning ranks and remove any that # have genuinely left the domain. Use the default max_its so that # boundary particles whose owner is the 2nd/3rd-closest centroid # get reclaimed via the kdtree retry — max_its=1 here was an # accidental regression that deleted boundary particles (issue #175, # reported by @bknight1). self.migrate( delete_lost_points=True, ) return
[docs] @timing.routine_timer_decorator def estimate_dt(self, V_fn): """ Calculates an appropriate advective timestep for the given mesh and velocity configuration. """ # we'll want to do this on an element by element basis # for more general mesh # first let's extract a max global velocity magnitude import math import numpy as np vel = uw.function.evaluate(V_fn, self._particle_coordinates.data, evalf=True) # If vel is unit-aware (UnitAwareArray), nondimensionalise it to get # consistent nondimensional values that match mesh._radii # Note: .magnitude returns physical units, which would be wrong here if hasattr(vel, "units") and vel.units is not None: vel = uw.non_dimensionalise(vel) elif hasattr(vel, "magnitude"): # Plain UWQuantity without units context - use magnitude vel = vel.magnitude # Ensure vel is a plain numpy array vel = np.asarray(vel) try: magvel_squared = vel[:, 0] ** 2 + vel[:, 1] ** 2 if self.mesh.dim == 3: magvel_squared += vel[:, 2] ** 2 max_magvel = math.sqrt(magvel_squared.max()) except (ValueError, IndexError): max_magvel = 0.0 from mpi4py import MPI max_magvel_glob = comm.allreduce(max_magvel, op=MPI.MAX) min_dx = self.mesh.get_min_radius() # The assumption should be that we cross one or two elements (2-4 radii), not more, # in a single step (order 2, means one element per half-step or something # that we can broadly interpret that way) if max_magvel_glob != 0.0: return min_dx / max_magvel_glob else: return None
[docs] class NodalPointSwarm(Swarm): r"""BASIC_Swarm with particles located at the coordinate points of a meshVariable The swarmVariable `X0` is defined so that the particles can "snap back" to their original locations after they have been moved. The purpose of this Swarm is to manage sample points for advection schemes based on upstream sampling (method of characteristics etc)"""
[docs] def __init__( self, trackedVariable: uw.discretisation.MeshVariable, verbose=False, ): self.trackedVariable = trackedVariable self.swarmVariable = None mesh = trackedVariable.mesh # Set up a standard swarm super().__init__(mesh, verbose, clip_to_mesh=False) nswarm = self meshVar_name = trackedVariable.clean_name meshVar_symbol = trackedVariable.symbol ks = str(self.instance_number) name = f"{meshVar_name}_star" symbol = rf"{{ {meshVar_symbol} }}^{{ <*> }}" self.swarmVariable = uw.swarm.SwarmVariable( name, nswarm, vtype=trackedVariable.vtype, _proxy=False, varsymbol=symbol, ) # The launch point location name = f"ns_X0_{ks}" symbol = r"X0^{*^{{[" + ks + "]}}}" nX0 = uw.swarm.SwarmVariable(name, nswarm, nswarm.dim, _proxy=False) # The launch point index name = f"ns_I_{ks}" symbol = r"I^{*^{{[" + ks + "]}}}" nI0 = uw.swarm.SwarmVariable(name, nswarm, 1, dtype=int, _proxy=False) # The launch point processor rank name = f"ns_R0_{ks}" symbol = r"R0^{*^{{[" + ks + "]}}}" nR0 = uw.swarm.SwarmVariable(name, nswarm, 1, dtype=int, _proxy=False) nswarm.dm.finalizeFieldRegister() # PETSc < 3.24 has an off-by-one bug in addNPoints when swarm size is initially zero # It allocates N-1 instead of N, so we add +1 to compensate # PETSc 3.24+ fixed this bug, so we use the exact count from petsc4py import PETSc npts_to_add = trackedVariable.coords.shape[0] if PETSc.Sys.getVersion() < (3, 24, 0): npts_to_add = npts_to_add + 1 nswarm.dm.addNPoints(npts_to_add) coords = nswarm.dm.getField("DMSwarmPIC_coor").reshape((-1, nswarm.dim)) ranks = nswarm.dm.getField("DMSwarm_rank").reshape((-1, 1)) coords[...] = trackedVariable.coords[...] ranks[...] = uw.mpi.rank cellid = self.mesh.get_closest_cells( coords, ) # Move slightly within the chosen cell to avoid edge effects centroid_coords = self.mesh._centroids[cellid] shift = 0.001 coords[:, :] = (1.0 - shift) * coords[:, :] + shift * centroid_coords[:, :] nswarm.dm.restoreField("DMSwarmPIC_coor") nswarm.dm.restoreField("DMSwarm_rank") nswarm.dm.migrate(remove_sent_points=True) with nswarm.access(nX0, nI0): nX0.data[:, :] = coords nI0.data[:, 0] = range(0, coords.shape[0]) self._nswarm = nswarm self._nX0 = nX0 self._nI0 = nI0 self._nR0 = nR0 return
[docs] @timing.routine_timer_decorator def advection( self, V_fn, delta_t, order=2, corrector=False, restore_points_to_domain_func=None, evalf=False, step_limit=True, ): with self.access(self._X0): self._X0.data[...] = self._nX0.data[...] with self.access(self._nR0): self._nR0.data[...] = uw.mpi.rank super().advection( V_fn, delta_t, order, corrector, restore_points_to_domain_func, evalf, step_limit, ) return
## New - Basic Swarm (no PIC skillz) ## What is missing: ## - no celldm ## - PIC layouts of particles are not directly available / must be done by hand ## - No automatic migration - must compute ranks for the particle swarms ## - No automatic definition of coordinate fields (need to add by hand)