Source code for underworld3.utilities.uw_petsc_gen_xdmf

#!/usr/bin/env python
# VTK high order: https://blog.kitware.com/modeling-arbitrary-order-lagrange-finite-elements-in-the-visualization-toolkit/
import h5py
import numpy as np
import os, sys


class Xdmf:
    def __init__(self, filename):
        self.filename = filename
        self.cellMap = {
            1: {1: "Polyvertex", 2: "Polyline"},
            2: {3: "Triangle", 4: "Quadrilateral"},
            3: {4: "Tetrahedron", 6: "Wedge", 8: "Hexahedron"},
        }

        # py2/py3 compatibility, see https://github.com/h5py/h5py/issues/379
        if sys.version_info[0] < 3:
            self.typeMap = {
                "scalar": "Scalar",
                "vector": "Vector",
                "tensor": "Tensor6",
                "matrix": "Matrix",
            }
            self.typeExt = {
                2: {"vector": ["x", "y"], "tensor": ["xx", "yy", "xy"]},
                3: {
                    "vector": ["x", "y", "z"],
                    "tensor": ["xx", "yy", "zz", "xy", "yz", "xz"],
                },
            }
        else:
            self.typeMap = {
                b"scalar": "Scalar",
                b"vector": "Vector",
                b"tensor": "Tensor6",
                b"matrix": "Matrix",
            }
            self.typeExt = {
                2: {b"vector": ["x", "y"], b"tensor": ["xx", "yy", "xy"]},
                3: {
                    b"vector": ["x", "y", "z"],
                    b"tensor": ["xx", "yy", "zz", "xy", "yz", "xz"],
                },
            }
        return

    def writeHeader(self, fp, hdfFilename):
        fp.write(
            """\
<?xml version="1.0" ?>
<!DOCTYPE Xdmf SYSTEM "Xdmf.dtd" [
<!ENTITY HeavyData "%s">
]>
"""
            % os.path.basename(hdfFilename)
        )
        fp.write('\n<Xdmf>\n  <Domain Name="domain">\n')
        return

    def writeCells(self, fp, topologyPath, numCells, numCorners, cellsName="cells"):
        fp.write(
            """\
    <DataItem Name="%s"
              ItemType="Uniform"
              Format="HDF"
              NumberType="Float" Precision="8"
              Dimensions="%d %d">
      &HeavyData;:/%s/cells
    </DataItem>
"""
            % (cellsName, numCells, numCorners, topologyPath)
        )
        return

    def writeVertices(self, fp, geometryPath, numVertices, spaceDim):
        fp.write(
            """\
    <DataItem Name="vertices"
              Format="HDF"
              Dimensions="%d %d">
      &HeavyData;:/%s/vertices
    </DataItem>
    <!-- ============================================================ -->
"""
            % (numVertices, spaceDim, geometryPath)
        )
        return

    def writeLocations(self, fp, numParticles, spaceDim):
        fp.write(
            """\
    <DataItem Name="particle_coordinates"
              Format="HDF"
              Dimensions="%d %d">
      &HeavyData;:/particles/coordinates
    </DataItem>
"""
            % (numParticles, spaceDim)
        )
        return

    def writeTimeGridHeader(self, fp, time):
        fp.write(
            """\
    <Grid Name="TimeSeries" GridType="Collection" CollectionType="Temporal">
      <Time TimeType="List">
        <DataItem Format="XML" NumberType="Float" Dimensions="%d">
          """
            % (len(time))
        )
        fp.write(" ".join([str(float(t)) for t in time]))
        fp.write(
            """
        </DataItem>
      </Time>
"""
        )
        return

    # http://www.xdmf.org/index.php/XDMF_Model_and_Format#Topology
    def writeHybridSpaceGridHeader(self, fp):
        fp.write('      <Grid Name="domain" GridType="Collection">\n')
        return

    def writeSpaceGridHeader(self, fp, numCells, numCorners, cellDim, spaceDim, cellsName="cells"):
        fp.write(
            """\
      <Grid Name="domain" GridType="Uniform">
        <Topology
           TopologyType="%s"
           NumberOfElements="%d">
          <DataItem Reference="XML">
            /Xdmf/Domain/DataItem[@Name="%s"]
          </DataItem>
        </Topology>
        <Geometry GeometryType="%s">
          <DataItem Reference="XML">
            /Xdmf/Domain/DataItem[@Name="vertices"]
          </DataItem>
        </Geometry>
"""
            % (
                self.cellMap[cellDim][numCorners],
                numCells,
                cellsName,
                "XYZ" if spaceDim > 2 else "XY",
            )
        )
        return

    def writeFieldSingle(self, fp, numSteps, timestep, spaceDim, name, f, domain):
        if len(f[1].shape) > 2:
            dof = f[1].shape[1]
            bs = f[1].shape[2]
        elif len(f[1].shape) > 1:
            if numSteps > 1:
                dof = f[1].shape[1]
                bs = 1
            else:
                dof = f[1].shape[0]
                bs = f[1].shape[1]
        else:
            dof = f[1].shape[0]
            bs = 1

        if bs > 1:
            this_is_a = "Vector"
        else:
            this_is_a = "Scalar"

        fp.write(
            """\
        <Attribute
           Name="%s"
           Type="%s"
           Center="%s">
          <DataItem ItemType="HyperSlab"
        	    Dimensions="1 %d %d"
        	    Type="HyperSlab">
            <DataItem
               Dimensions="3 3"
               Format="XML">
              %d 0 0
              1 1 1
              1 %d %d
            </DataItem>
            <DataItem
               DataType="Float" Precision="8"
               Dimensions="%d %d %d"
               Format="HDF">
              &HeavyData;:%s
            </DataItem>
          </DataItem>
        </Attribute>
"""
            % (
                f[0],
                this_is_a,  # self.typeMap[f[1].attrs["vector_field_type"]],
                domain,
                dof,
                bs,
                timestep,
                dof,
                bs,
                numSteps,
                dof,
                bs,
                name,
            )
        )
        return

    def writeFieldComponents(self, fp, numSteps, timestep, spaceDim, name, f, domain):
        # vtype = f[1].attrs["vector_field_type"]

        if len(f[1].shape) > 2:
            dof = f[1].shape[1]
            bs = f[1].shape[2]
            cdims = "1 %d 1" % dof
            dims = "%d %d %d" % (numSteps, dof, bs)
            stride = "1 1 1"
            size = "1 %d 1" % dof
        else:
            dof = f[1].shape[0]
            bs = f[1].shape[1]
            cdims = "%d 1" % dof
            dims = "%d %d" % (dof, bs)
            stride = "1 1"
            size = "%d 1" % dof

        # print(
        #     f"name = {name}, dof = {dof}, dims = {dims}, cdims = {cdims}, bs = {bs}, spaceDim = {spaceDim}",
        #     flush=True,
        # )

        # if bs == spaceDim:
        #     vtype = b"vector"
        # elif bs == 3 * spaceDim:
        #     vtype = b"tensor"
        # else:
        #     vtype = b"matrix"

        for c in range(bs):
            # ext = self.typeExt[spaceDim][vtype][c]
            ext = f"{c}"  # Let's just number the components since it is not entirely obvious what they will be
            if len(f[1].shape) > 2:
                start = "%d 0 %d" % (timestep, c)
            else:
                start = "0 %d" % c
            fp.write(
                """\
        <Attribute
           Name="%s"
           Type="Scalar"
           Center="%s">
          <DataItem ItemType="HyperSlab"
        	    Dimensions="%s"
        	    Type="HyperSlab">
            <DataItem
               Dimensions="3 %d"
               Format="XML">
              %s
              %s
              %s
            </DataItem>
            <DataItem
               DataType="Float" Precision="8"
               Dimensions="%s"
               Format="HDF">
              &HeavyData;:%s
            </DataItem>
          </DataItem>
        </Attribute>
"""
                % (
                    f[0] + "_" + ext,
                    domain,
                    cdims,
                    len(f[1].shape),
                    start,
                    stride,
                    size,
                    dims,
                    name,
                )
            )
        return

    def writeField(self, fp, numSteps, timestep, cellDim, spaceDim, name, f, domain):
        # print(f"Writing field {f}")
        # print(f"cellDim: {cellDim}")
        # print(f"spaceDim: {spaceDim}")
        # print(f"Number of Cells {self.numCells}")
        # print(f"Number of Verticds {self.numVertices}")
        # print(f"Field size: {f[1].shape}")

        try:
            components = f[1].shape[1]
        except IndexError:
            components = 1

        # Scalars or Vectors are mapped to a single entity in xdmf
        # and this is very convenient with paraview, for example.
        # Other entities are send through component-by-components
        # and the user can figure out what to do when visualising them.
        # ToDo: maybe it would be helpful to have the tensors done properly too.

        if components == 1 or components == spaceDim:
            self.writeFieldSingle(fp, numSteps, timestep, spaceDim, name, f, domain)
        else:
            self.writeFieldComponents(fp, numSteps, timestep, spaceDim, name, f, domain)

        # ctypes = ["tensor", "matrix"]
        # if spaceDim == 2 or cellDim != spaceDim:
        #     ctypes.append("vector")
        # if f[1].attrs["vector_field_type"] in ctypes:
        #     self.writeFieldComponents(fp, numSteps, timestep, spaceDim, name, f, domain)
        # else:
        #     self.writeFieldSingle(fp, numSteps, timestep, spaceDim, name, f, domain)
        # return

    def writeSpaceGridFooter(self, fp):
        fp.write("      </Grid>\n")
        return

    def writeParticleGridHeader(self, fp, numParticles, spaceDim):
        fp.write(
            """\
      <Grid Name="particle_domain" GridType="Uniform">
        <Topology TopologyType="Polyvertex" NodesPerElement="%d" />
        <Geometry GeometryType="%s">
          <DataItem Reference="XML">/Xdmf/Domain/DataItem[@Name="particle_coordinates"]</DataItem>
        </Geometry>
"""
            % (numParticles, "XYZ" if spaceDim > 2 else "XY")
        )
        return

    def writeParticleField(self, fp, fieldname, numParticles, numComp):
        fp.write(
            """\
    <Attribute Name="particles/%s">
      <DataItem Name="%s"
                Format="HDF"
                Dimensions="%d %d">
                &HeavyData;:/particle_fields/%s
      </DataItem>
    </Attribute>
"""
            % (fieldname, fieldname, numParticles, numComp, fieldname)
        )
        return

    def writeTimeGridFooter(self, fp):
        fp.write("    </Grid>\n")
        return

    def writeFooter(self, fp):
        fp.write("  </Domain>\n</Xdmf>\n")
        return

    def write(
        self,
        hdfFilename,
        topologyPath,
        numCells,
        numCorners,
        cellDim,
        htopologyPath,
        numHCells,
        numHCorners,
        geometryPath,
        numVertices,
        spaceDim,
        time,
        vfields,
        cfields,
        numParticles,
        pfields,
    ):
        self.numVertices = numVertices
        self.numCells = numCells

        useTime = not (len(time) < 2 and time[0] == -1)
        with open(self.filename, "w") as fp:
            self.writeHeader(fp, hdfFilename)
            # Field information
            self.writeCells(fp, topologyPath, numCells, numCorners)
            if numHCells:
                self.writeCells(fp, htopologyPath, numHCells, numHCorners, "hcells")
            self.writeVertices(fp, geometryPath, numVertices, spaceDim)
            if useTime:
                self.writeTimeGridHeader(fp, time)
            for t in range(len(time)):
                if numHCells:
                    self.writeHybridSpaceGridHeader(fp)
                    self.writeSpaceGridHeader(
                        fp, numHCells, numHCorners, cellDim, spaceDim, "hcells"
                    )
                    self.writeSpaceGridFooter(fp)
                self.writeSpaceGridHeader(fp, numCells, numCorners, cellDim, spaceDim)
                for vf in vfields:
                    vpath = vf[2] if len(vf) > 2 else "/vertex_fields/" + vf[0]
                    self.writeField(
                        fp,
                        len(time),
                        t,
                        cellDim,
                        spaceDim,
                        vpath,
                        vf,
                        "Node",
                    )
                for cf in cfields:
                    cpath = cf[2] if len(cf) > 2 else "/cell_fields/" + cf[0]
                    self.writeField(
                        fp,
                        len(time),
                        t,
                        cellDim,
                        spaceDim,
                        cpath,
                        cf,
                        "Cell",
                    )
                self.writeSpaceGridFooter(fp)
                if numHCells:
                    self.writeSpaceGridFooter(fp)
            if useTime:
                self.writeTimeGridFooter(fp)
            if numParticles:
                self.writeLocations(fp, numParticles, spaceDim)
                if useTime:
                    self.writeTimeGridHeader(fp, time)
                for t in range(len(time)):
                    self.writeParticleGridHeader(fp, numParticles, spaceDim)
                    for pf in pfields:
                        self.writeParticleField(fp, pf[0], numParticles, int(pf[1].attrs["Nc"]))
                    self.writeSpaceGridFooter(fp)
                if useTime:
                    self.writeTimeGridFooter(fp)
            self.writeFooter(fp)
        return


def _collect_mesh_fields(h5, numVertices, numCells):
    """Collect mesh fields from HDF5 in either new (/fields) or legacy layouts."""
    vfields = []
    cfields = []

    # New layout: /fields/<name>, with /fields/coordinates reserved for point locations
    if "fields" in h5:
        for name, dataset in h5["fields"].items():
            if name == "coordinates":
                continue

            dims = tuple(dataset.shape)
            has_vertices_dim = numVertices in dims
            has_cells_dim = numCells in dims

            item = (name, dataset, f"/fields/{name}")

            if has_cells_dim and not has_vertices_dim:
                cfields.append(item)
            else:
                # Default to node-centered when ambiguous
                vfields.append(item)

        return vfields, cfields

    # Legacy layout
    if "vertex_fields" in h5:
        vfields = list(h5["vertex_fields"].items())
    if "cell_fields" in h5:
        cfields = list(h5["cell_fields"].items())

    return vfields, cfields


[docs] def generateXdmf(hdfFilename, xdmfFilename=None): if xdmfFilename is None: xdmfFilename = os.path.splitext(hdfFilename)[0] + ".xmf" # Read mesh h5 = h5py.File(hdfFilename, "r") if "viz" in h5 and "geometry" in h5["viz"]: geomPath = "viz/geometry" geom = h5["viz"]["geometry"] else: geomPath = "geometry" geom = h5["geometry"] if "viz" in h5 and "topology" in h5["viz"]: topoPath = "viz/topology" topo = h5["viz"]["topology"] else: topoPath = "topology" topo = h5["topology"] if "viz" in h5 and "hybrid_topology" in h5["viz"]: htopoPath = "viz/hybrid_topology" htopo = h5["viz"]["hybrid_topology"] else: htopoPath = None htopo = None vertices = geom["vertices"] numVertices = vertices.shape[0] spaceDim = vertices.shape[1] cells = topo["cells"] numCells = cells.shape[0] numCorners = cells.shape[1] cellDim = topo["cells"].attrs["cell_dim"] if htopo: hcells = htopo["cells"] numHCells = hcells.shape[0] numHCorners = hcells.shape[1] else: numHCells = 0 numHCorners = 0 if "time" in h5: time = np.array(h5["time"]).flatten() else: time = [-1] vfields, cfields = _collect_mesh_fields(h5, numVertices, numCells) pfields = [] numParticles = 0 if "particles" in h5: numParticles = h5["particles"]["coordinates"].shape[0] if "particle_fields" in h5: pfields = list(h5["particle_fields"].items()) # Write Xdmf Xdmf(xdmfFilename).write( hdfFilename, topoPath, numCells, numCorners, cellDim, htopoPath, numHCells, numHCorners, geomPath, numVertices, spaceDim, time, vfields, cfields, numParticles, pfields, ) h5.close() return
[docs] def generate_uw_Xdmf(hdfFilename, xdmfFilename=None): if xdmfFilename is None: xdmfFilename = os.path.splitext(hdfFilename)[0] + ".xmf" # Read mesh h5 = h5py.File(hdfFilename, "r") if "viz" in h5 and "geometry" in h5["viz"]: geomPath = "viz/geometry" geom = h5["viz"]["geometry"] else: geomPath = "geometry" geom = h5["geometry"] if "viz" in h5 and "topology" in h5["viz"]: topoPath = "viz/topology" topo = h5["viz"]["topology"] else: topoPath = "topology" topo = h5["topology"] if "viz" in h5 and "hybrid_topology" in h5["viz"]: htopoPath = "viz/hybrid_topology" htopo = h5["viz"]["hybrid_topology"] else: htopoPath = None htopo = None vertices = geom["vertices"] numVertices = vertices.shape[0] spaceDim = vertices.shape[1] cells = topo["cells"] numCells = cells.shape[0] numCorners = cells.shape[1] cellDim = topo["cells"].attrs["cell_dim"] if htopo: hcells = htopo["cells"] numHCells = hcells.shape[0] numHCorners = hcells.shape[1] else: numHCells = 0 numHCorners = 0 if "time" in h5: time = np.array(h5["time"]).flatten() else: time = [-1] vfields, cfields = _collect_mesh_fields(h5, numVertices, numCells) pfields = [] numParticles = 0 if "particles" in h5: numParticles = h5["particles"]["coordinates"].shape[0] if "particle_fields" in h5: pfields = list(h5["particle_fields"].items()) # Write Xdmf Xdmf(xdmfFilename).write( hdfFilename, topoPath, numCells, numCorners, cellDim, htopoPath, numHCells, numHCorners, geomPath, numVertices, spaceDim, time, vfields, cfields, numParticles, pfields, ) h5.close() return
if __name__ == "__main__": for f in sys.argv[1:]: generateXdmf(f)