#!/usr/bin/env python
# VTK high order: https://blog.kitware.com/modeling-arbitrary-order-lagrange-finite-elements-in-the-visualization-toolkit/
import h5py
import numpy as np
import os, sys
class Xdmf:
def __init__(self, filename):
self.filename = filename
self.cellMap = {
1: {1: "Polyvertex", 2: "Polyline"},
2: {3: "Triangle", 4: "Quadrilateral"},
3: {4: "Tetrahedron", 6: "Wedge", 8: "Hexahedron"},
}
# py2/py3 compatibility, see https://github.com/h5py/h5py/issues/379
if sys.version_info[0] < 3:
self.typeMap = {
"scalar": "Scalar",
"vector": "Vector",
"tensor": "Tensor6",
"matrix": "Matrix",
}
self.typeExt = {
2: {"vector": ["x", "y"], "tensor": ["xx", "yy", "xy"]},
3: {
"vector": ["x", "y", "z"],
"tensor": ["xx", "yy", "zz", "xy", "yz", "xz"],
},
}
else:
self.typeMap = {
b"scalar": "Scalar",
b"vector": "Vector",
b"tensor": "Tensor6",
b"matrix": "Matrix",
}
self.typeExt = {
2: {b"vector": ["x", "y"], b"tensor": ["xx", "yy", "xy"]},
3: {
b"vector": ["x", "y", "z"],
b"tensor": ["xx", "yy", "zz", "xy", "yz", "xz"],
},
}
return
def writeHeader(self, fp, hdfFilename):
fp.write(
"""\
<?xml version="1.0" ?>
<!DOCTYPE Xdmf SYSTEM "Xdmf.dtd" [
<!ENTITY HeavyData "%s">
]>
"""
% os.path.basename(hdfFilename)
)
fp.write('\n<Xdmf>\n <Domain Name="domain">\n')
return
def writeCells(self, fp, topologyPath, numCells, numCorners, cellsName="cells"):
fp.write(
"""\
<DataItem Name="%s"
ItemType="Uniform"
Format="HDF"
NumberType="Float" Precision="8"
Dimensions="%d %d">
&HeavyData;:/%s/cells
</DataItem>
"""
% (cellsName, numCells, numCorners, topologyPath)
)
return
def writeVertices(self, fp, geometryPath, numVertices, spaceDim):
fp.write(
"""\
<DataItem Name="vertices"
Format="HDF"
Dimensions="%d %d">
&HeavyData;:/%s/vertices
</DataItem>
<!-- ============================================================ -->
"""
% (numVertices, spaceDim, geometryPath)
)
return
def writeLocations(self, fp, numParticles, spaceDim):
fp.write(
"""\
<DataItem Name="particle_coordinates"
Format="HDF"
Dimensions="%d %d">
&HeavyData;:/particles/coordinates
</DataItem>
"""
% (numParticles, spaceDim)
)
return
def writeTimeGridHeader(self, fp, time):
fp.write(
"""\
<Grid Name="TimeSeries" GridType="Collection" CollectionType="Temporal">
<Time TimeType="List">
<DataItem Format="XML" NumberType="Float" Dimensions="%d">
"""
% (len(time))
)
fp.write(" ".join([str(float(t)) for t in time]))
fp.write(
"""
</DataItem>
</Time>
"""
)
return
# http://www.xdmf.org/index.php/XDMF_Model_and_Format#Topology
def writeHybridSpaceGridHeader(self, fp):
fp.write(' <Grid Name="domain" GridType="Collection">\n')
return
def writeSpaceGridHeader(self, fp, numCells, numCorners, cellDim, spaceDim, cellsName="cells"):
fp.write(
"""\
<Grid Name="domain" GridType="Uniform">
<Topology
TopologyType="%s"
NumberOfElements="%d">
<DataItem Reference="XML">
/Xdmf/Domain/DataItem[@Name="%s"]
</DataItem>
</Topology>
<Geometry GeometryType="%s">
<DataItem Reference="XML">
/Xdmf/Domain/DataItem[@Name="vertices"]
</DataItem>
</Geometry>
"""
% (
self.cellMap[cellDim][numCorners],
numCells,
cellsName,
"XYZ" if spaceDim > 2 else "XY",
)
)
return
def writeFieldSingle(self, fp, numSteps, timestep, spaceDim, name, f, domain):
if len(f[1].shape) > 2:
dof = f[1].shape[1]
bs = f[1].shape[2]
elif len(f[1].shape) > 1:
if numSteps > 1:
dof = f[1].shape[1]
bs = 1
else:
dof = f[1].shape[0]
bs = f[1].shape[1]
else:
dof = f[1].shape[0]
bs = 1
if bs > 1:
this_is_a = "Vector"
else:
this_is_a = "Scalar"
fp.write(
"""\
<Attribute
Name="%s"
Type="%s"
Center="%s">
<DataItem ItemType="HyperSlab"
Dimensions="1 %d %d"
Type="HyperSlab">
<DataItem
Dimensions="3 3"
Format="XML">
%d 0 0
1 1 1
1 %d %d
</DataItem>
<DataItem
DataType="Float" Precision="8"
Dimensions="%d %d %d"
Format="HDF">
&HeavyData;:%s
</DataItem>
</DataItem>
</Attribute>
"""
% (
f[0],
this_is_a, # self.typeMap[f[1].attrs["vector_field_type"]],
domain,
dof,
bs,
timestep,
dof,
bs,
numSteps,
dof,
bs,
name,
)
)
return
def writeFieldComponents(self, fp, numSteps, timestep, spaceDim, name, f, domain):
# vtype = f[1].attrs["vector_field_type"]
if len(f[1].shape) > 2:
dof = f[1].shape[1]
bs = f[1].shape[2]
cdims = "1 %d 1" % dof
dims = "%d %d %d" % (numSteps, dof, bs)
stride = "1 1 1"
size = "1 %d 1" % dof
else:
dof = f[1].shape[0]
bs = f[1].shape[1]
cdims = "%d 1" % dof
dims = "%d %d" % (dof, bs)
stride = "1 1"
size = "%d 1" % dof
# print(
# f"name = {name}, dof = {dof}, dims = {dims}, cdims = {cdims}, bs = {bs}, spaceDim = {spaceDim}",
# flush=True,
# )
# if bs == spaceDim:
# vtype = b"vector"
# elif bs == 3 * spaceDim:
# vtype = b"tensor"
# else:
# vtype = b"matrix"
for c in range(bs):
# ext = self.typeExt[spaceDim][vtype][c]
ext = f"{c}" # Let's just number the components since it is not entirely obvious what they will be
if len(f[1].shape) > 2:
start = "%d 0 %d" % (timestep, c)
else:
start = "0 %d" % c
fp.write(
"""\
<Attribute
Name="%s"
Type="Scalar"
Center="%s">
<DataItem ItemType="HyperSlab"
Dimensions="%s"
Type="HyperSlab">
<DataItem
Dimensions="3 %d"
Format="XML">
%s
%s
%s
</DataItem>
<DataItem
DataType="Float" Precision="8"
Dimensions="%s"
Format="HDF">
&HeavyData;:%s
</DataItem>
</DataItem>
</Attribute>
"""
% (
f[0] + "_" + ext,
domain,
cdims,
len(f[1].shape),
start,
stride,
size,
dims,
name,
)
)
return
def writeField(self, fp, numSteps, timestep, cellDim, spaceDim, name, f, domain):
# print(f"Writing field {f}")
# print(f"cellDim: {cellDim}")
# print(f"spaceDim: {spaceDim}")
# print(f"Number of Cells {self.numCells}")
# print(f"Number of Verticds {self.numVertices}")
# print(f"Field size: {f[1].shape}")
try:
components = f[1].shape[1]
except IndexError:
components = 1
# Scalars or Vectors are mapped to a single entity in xdmf
# and this is very convenient with paraview, for example.
# Other entities are send through component-by-components
# and the user can figure out what to do when visualising them.
# ToDo: maybe it would be helpful to have the tensors done properly too.
if components == 1 or components == spaceDim:
self.writeFieldSingle(fp, numSteps, timestep, spaceDim, name, f, domain)
else:
self.writeFieldComponents(fp, numSteps, timestep, spaceDim, name, f, domain)
# ctypes = ["tensor", "matrix"]
# if spaceDim == 2 or cellDim != spaceDim:
# ctypes.append("vector")
# if f[1].attrs["vector_field_type"] in ctypes:
# self.writeFieldComponents(fp, numSteps, timestep, spaceDim, name, f, domain)
# else:
# self.writeFieldSingle(fp, numSteps, timestep, spaceDim, name, f, domain)
# return
def writeSpaceGridFooter(self, fp):
fp.write(" </Grid>\n")
return
def writeParticleGridHeader(self, fp, numParticles, spaceDim):
fp.write(
"""\
<Grid Name="particle_domain" GridType="Uniform">
<Topology TopologyType="Polyvertex" NodesPerElement="%d" />
<Geometry GeometryType="%s">
<DataItem Reference="XML">/Xdmf/Domain/DataItem[@Name="particle_coordinates"]</DataItem>
</Geometry>
"""
% (numParticles, "XYZ" if spaceDim > 2 else "XY")
)
return
def writeParticleField(self, fp, fieldname, numParticles, numComp):
fp.write(
"""\
<Attribute Name="particles/%s">
<DataItem Name="%s"
Format="HDF"
Dimensions="%d %d">
&HeavyData;:/particle_fields/%s
</DataItem>
</Attribute>
"""
% (fieldname, fieldname, numParticles, numComp, fieldname)
)
return
def writeTimeGridFooter(self, fp):
fp.write(" </Grid>\n")
return
def writeFooter(self, fp):
fp.write(" </Domain>\n</Xdmf>\n")
return
def write(
self,
hdfFilename,
topologyPath,
numCells,
numCorners,
cellDim,
htopologyPath,
numHCells,
numHCorners,
geometryPath,
numVertices,
spaceDim,
time,
vfields,
cfields,
numParticles,
pfields,
):
self.numVertices = numVertices
self.numCells = numCells
useTime = not (len(time) < 2 and time[0] == -1)
with open(self.filename, "w") as fp:
self.writeHeader(fp, hdfFilename)
# Field information
self.writeCells(fp, topologyPath, numCells, numCorners)
if numHCells:
self.writeCells(fp, htopologyPath, numHCells, numHCorners, "hcells")
self.writeVertices(fp, geometryPath, numVertices, spaceDim)
if useTime:
self.writeTimeGridHeader(fp, time)
for t in range(len(time)):
if numHCells:
self.writeHybridSpaceGridHeader(fp)
self.writeSpaceGridHeader(
fp, numHCells, numHCorners, cellDim, spaceDim, "hcells"
)
self.writeSpaceGridFooter(fp)
self.writeSpaceGridHeader(fp, numCells, numCorners, cellDim, spaceDim)
for vf in vfields:
vpath = vf[2] if len(vf) > 2 else "/vertex_fields/" + vf[0]
self.writeField(
fp,
len(time),
t,
cellDim,
spaceDim,
vpath,
vf,
"Node",
)
for cf in cfields:
cpath = cf[2] if len(cf) > 2 else "/cell_fields/" + cf[0]
self.writeField(
fp,
len(time),
t,
cellDim,
spaceDim,
cpath,
cf,
"Cell",
)
self.writeSpaceGridFooter(fp)
if numHCells:
self.writeSpaceGridFooter(fp)
if useTime:
self.writeTimeGridFooter(fp)
if numParticles:
self.writeLocations(fp, numParticles, spaceDim)
if useTime:
self.writeTimeGridHeader(fp, time)
for t in range(len(time)):
self.writeParticleGridHeader(fp, numParticles, spaceDim)
for pf in pfields:
self.writeParticleField(fp, pf[0], numParticles, int(pf[1].attrs["Nc"]))
self.writeSpaceGridFooter(fp)
if useTime:
self.writeTimeGridFooter(fp)
self.writeFooter(fp)
return
def _collect_mesh_fields(h5, numVertices, numCells):
"""Collect mesh fields from HDF5 in either new (/fields) or legacy layouts."""
vfields = []
cfields = []
# New layout: /fields/<name>, with /fields/coordinates reserved for point locations
if "fields" in h5:
for name, dataset in h5["fields"].items():
if name == "coordinates":
continue
dims = tuple(dataset.shape)
has_vertices_dim = numVertices in dims
has_cells_dim = numCells in dims
item = (name, dataset, f"/fields/{name}")
if has_cells_dim and not has_vertices_dim:
cfields.append(item)
else:
# Default to node-centered when ambiguous
vfields.append(item)
return vfields, cfields
# Legacy layout
if "vertex_fields" in h5:
vfields = list(h5["vertex_fields"].items())
if "cell_fields" in h5:
cfields = list(h5["cell_fields"].items())
return vfields, cfields
[docs]
def generateXdmf(hdfFilename, xdmfFilename=None):
if xdmfFilename is None:
xdmfFilename = os.path.splitext(hdfFilename)[0] + ".xmf"
# Read mesh
h5 = h5py.File(hdfFilename, "r")
if "viz" in h5 and "geometry" in h5["viz"]:
geomPath = "viz/geometry"
geom = h5["viz"]["geometry"]
else:
geomPath = "geometry"
geom = h5["geometry"]
if "viz" in h5 and "topology" in h5["viz"]:
topoPath = "viz/topology"
topo = h5["viz"]["topology"]
else:
topoPath = "topology"
topo = h5["topology"]
if "viz" in h5 and "hybrid_topology" in h5["viz"]:
htopoPath = "viz/hybrid_topology"
htopo = h5["viz"]["hybrid_topology"]
else:
htopoPath = None
htopo = None
vertices = geom["vertices"]
numVertices = vertices.shape[0]
spaceDim = vertices.shape[1]
cells = topo["cells"]
numCells = cells.shape[0]
numCorners = cells.shape[1]
cellDim = topo["cells"].attrs["cell_dim"]
if htopo:
hcells = htopo["cells"]
numHCells = hcells.shape[0]
numHCorners = hcells.shape[1]
else:
numHCells = 0
numHCorners = 0
if "time" in h5:
time = np.array(h5["time"]).flatten()
else:
time = [-1]
vfields, cfields = _collect_mesh_fields(h5, numVertices, numCells)
pfields = []
numParticles = 0
if "particles" in h5:
numParticles = h5["particles"]["coordinates"].shape[0]
if "particle_fields" in h5:
pfields = list(h5["particle_fields"].items())
# Write Xdmf
Xdmf(xdmfFilename).write(
hdfFilename,
topoPath,
numCells,
numCorners,
cellDim,
htopoPath,
numHCells,
numHCorners,
geomPath,
numVertices,
spaceDim,
time,
vfields,
cfields,
numParticles,
pfields,
)
h5.close()
return
[docs]
def generate_uw_Xdmf(hdfFilename, xdmfFilename=None):
if xdmfFilename is None:
xdmfFilename = os.path.splitext(hdfFilename)[0] + ".xmf"
# Read mesh
h5 = h5py.File(hdfFilename, "r")
if "viz" in h5 and "geometry" in h5["viz"]:
geomPath = "viz/geometry"
geom = h5["viz"]["geometry"]
else:
geomPath = "geometry"
geom = h5["geometry"]
if "viz" in h5 and "topology" in h5["viz"]:
topoPath = "viz/topology"
topo = h5["viz"]["topology"]
else:
topoPath = "topology"
topo = h5["topology"]
if "viz" in h5 and "hybrid_topology" in h5["viz"]:
htopoPath = "viz/hybrid_topology"
htopo = h5["viz"]["hybrid_topology"]
else:
htopoPath = None
htopo = None
vertices = geom["vertices"]
numVertices = vertices.shape[0]
spaceDim = vertices.shape[1]
cells = topo["cells"]
numCells = cells.shape[0]
numCorners = cells.shape[1]
cellDim = topo["cells"].attrs["cell_dim"]
if htopo:
hcells = htopo["cells"]
numHCells = hcells.shape[0]
numHCorners = hcells.shape[1]
else:
numHCells = 0
numHCorners = 0
if "time" in h5:
time = np.array(h5["time"]).flatten()
else:
time = [-1]
vfields, cfields = _collect_mesh_fields(h5, numVertices, numCells)
pfields = []
numParticles = 0
if "particles" in h5:
numParticles = h5["particles"]["coordinates"].shape[0]
if "particle_fields" in h5:
pfields = list(h5["particle_fields"].items())
# Write Xdmf
Xdmf(xdmfFilename).write(
hdfFilename,
topoPath,
numCells,
numCorners,
cellDim,
htopoPath,
numHCells,
numHCorners,
geomPath,
numVertices,
spaceDim,
time,
vfields,
cfields,
numParticles,
pfields,
)
h5.close()
return
if __name__ == "__main__":
for f in sys.argv[1:]:
generateXdmf(f)