Source code for mirgecom.simutil

"""Provide some utilities for building simulation applications.

General utilities

.. autofunction:: check_step
.. autofunction:: get_sim_timestep
.. autofunction:: write_visfile
.. autofunction:: global_reduce
.. autofunction:: get_reasonable_memory_pool

Diagnostic utilities

.. autofunction:: compare_fluid_solutions
.. autofunction:: componentwise_norms
.. autofunction:: max_component_norm
.. autofunction:: check_naninf_local
.. autofunction:: check_range_local
.. autofunction:: boundary_report

Mesh and element utilities

.. autofunction:: geometric_mesh_partitioner
.. autofunction:: distribute_mesh
.. autofunction:: get_number_of_tetrahedron_nodes
.. autofunction:: get_box_mesh

Simulation support utilities

.. autofunction:: configurate

File comparison utilities

.. autofunction:: compare_files_vtu
.. autofunction:: compare_files_xdmf
.. autofunction:: compare_files_hdf5


.. autoclass:: SimulationConfigurationError
.. autoclass:: ApplicationOptionsError
.. autoclass:: SimulationRuntimeError

__copyright__ = """
Copyright (C) 2021 University of Illinois Board of Trustees

__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

import logging
from functools import partial
from typing import Dict, List, Optional
from logpyle import IntervalTimer

import grudge.op as op
import numpy as np
import pyopencl as cl
from arraycontext import tag_axes
from meshmode.transform_metadata import (
from arraycontext import flatten, map_array_container
from grudge.discretization import DiscretizationCollection, PartID
from grudge.dof_desc import DD_VOLUME_ALL
from meshmode.dof_array import DOFArray

from mirgecom.utils import normalize_boundaries
from mirgecom.viscous import get_viscous_timestep

logger = logging.getLogger(__name__)

[docs] class SimulationConfigurationError(RuntimeError): """Simulation physics configuration or parameters error.""" pass
[docs] class SimulationRuntimeError(RuntimeError): """General simulation runtime error.""" pass
[docs] class ApplicationOptionsError(RuntimeError): """Application command-line options error.""" pass
[docs] def get_number_of_tetrahedron_nodes(dim, order, include_faces=False): """Get number of nodes (modes) in *dim* Tetrahedron of *order*.""" # number of {nodes, modes} see e.g.: # JSH/TW Nodal DG Methods, Section 10.1 # DOI: 10.1007/978-0-387-72067-8 nnodes = int(np.math.factorial(dim+order) / (np.math.factorial(dim) * np.math.factorial(order))) if include_faces: nnodes = nnodes + (dim+1)*get_number_of_tetrahedron_nodes(dim-1, order) return nnodes
[docs] def get_box_mesh(dim, a, b, n, t=None, periodic=None, tensor_product_elements=False, **kwargs): """ Create a rectangular "box" like mesh with tagged boundary faces. The resulting mesh has boundary tags `"-i"` and `"+i"` for `i=1,...,dim` corresponding to lower and upper faces normal to coordinate dimension `i`. Parameters ---------- dim: int The mesh topological dimension a: float or tuple The coordinates of the lower corner of the box. If scalar-valued, gets promoted to a uniform tuple. b: float or tuple The coordinates of the upper corner of the box. If scalar-valued, gets promoted to a uniform tuple. n: int or tuple The number of elements along a given dimension. If scalar-valued, gets promoted to a uniform tuple. t: str or None The mesh type. See :func:`meshmode.mesh.generation.generate_box_mesh` for details. periodic: bool or tuple or None Indicates whether the mesh is periodic in a given dimension. If scalar-valued, gets promoted to a uniform tuple. Returns ------- :class:`meshmode.mesh.Mesh` The generated box mesh. """ if np.isscalar(a): a = (a,)*dim if np.isscalar(b): b = (b,)*dim if np.isscalar(n): n = (n,)*dim if periodic is None: periodic = (False,)*dim elif np.isscalar(periodic): periodic = (periodic,)*dim dim_names = ["x", "y", "z"] bttf = {} for i in range(dim): bttf["-"+str(i+1)] = ["-"+dim_names[i]] bttf["+"+str(i+1)] = ["+"+dim_names[i]] from meshmode.mesh import TensorProductElementGroup group_cls = TensorProductElementGroup if tensor_product_elements else None from meshmode.mesh.generation import generate_regular_rect_mesh as gen return gen(a=a, b=b, nelements_per_axis=n, boundary_tag_to_face=bttf, mesh_type=t, periodic=periodic, group_cls=group_cls, **kwargs)
[docs] def check_step(step, interval): """ Check step number against a user-specified interval. Utility is used typically for visualization. - Negative numbers mean 'never visualize'. - Zero means 'always visualize'. Useful for checking whether the current step is an output step, or anything else that occurs on fixed intervals. """ if interval == 0: return True elif interval < 0: return False elif step % interval == 0: return True return False
[docs] def get_sim_timestep( dcoll, state, t, dt, cfl, t_final=0.0, constant_cfl=False, local_dt=False, fluid_dd=DD_VOLUME_ALL): r"""Return the maximum stable timestep for a typical fluid simulation. This routine returns a constraint-limited timestep size for a fluid simulation. The returned timestep will be constrained by the specified Courant-Friedrichs-Lewy number, *cfl*, and the simulation max simulated time limit, *t_final*, and subject to the user's optional settings. The local fluid timestep, $\delta{t}_l$, is computed by :func:`~mirgecom.viscous.get_viscous_timestep`. Users are referred to that routine for the details of the local timestep. With the remaining simulation time $\Delta{t}_r = \left(\mathit{t\_final}-\mathit{t}\right)$, three modes are supported for the returned timestep, $\delta{t}$: - "Constant DT" mode (default): $\delta{t} = \mathbf{\text{min}} \left(\textit{dt},~\Delta{t}_r\right)$ - "Constant CFL" mode (constant_cfl=True): $\delta{t} = \mathbf{\text{min}}\left(\mathbf{\text{global\_min}}\left(\delta{t}\_l\right) ,~\Delta{t}_r\right)$ - "Local DT" mode (local_dt=True): $\delta{t} = \mathbf{\text{cell\_local\_min}} \left(\delta{t}_l\right)$ Note that for "Local DT" mode, *t_final* is ignored, and a :class:`~meshmode.dof_array.DOFArray` containing the local *cfl*-limited timestep, where $\mathbf{\text{cell\_local\_min}}\left(\delta{t}\_l\right)$ is defined as the minimum over the cell collocation points. This mode is useful for stepping to convergence of steady-state solutions. .. important:: For "Constant CFL" mode, this routine calls the collective :func:`~grudge.op.nodal_min` on the inside which involves MPI collective functions. Thus all MPI ranks on the :class:`~grudge.discretization.DiscretizationCollection` must call this routine collectively when using "Constant CFL" mode. Parameters ---------- dcoll: :class:`~grudge.discretization.DiscretizationCollection` The DG discretization collection to use state: :class:`~mirgecom.gas_model.FluidState` The full fluid conserved and thermal state t: float Current time t_final: float Final time dt: float The current timestep cfl: float The current CFL number constant_cfl: bool True if running constant CFL mode local_dt: bool True if running local DT mode. False by default. fluid_dd: grudge.dof_desc.DOFDesc the DOF descriptor of the discretization on which *state* lives. Must be a volume on the base discretization. Returns ------- float or :class:`~meshmode.dof_array.DOFArray` The global maximum stable DT based on a viscous fluid. """ actx = state.array_context if local_dt: ones = + 1.0 vdt = get_viscous_timestep(dcoll, state, dd=fluid_dd) emin = op.elementwise_min(dcoll, fluid_dd, vdt) return cfl * ones * emin my_dt = dt t_remaining = max(0, t_final - t) if constant_cfl: my_dt = state.array_context.to_numpy( cfl * op.nodal_min( dcoll, fluid_dd, get_viscous_timestep(dcoll=dcoll, state=state, dd=fluid_dd)))[()] return min(t_remaining, my_dt)
[docs] def write_visfile(dcoll, io_fields, visualizer, vizname, step=0, t=0, overwrite=False, vis_timer=None, comm=None): """Write parallel VTK output for the fields specified in *io_fields*. This routine writes a parallel-compatible unstructured VTK visualization file set in (vtu/pvtu) format. One file per MPI rank is written with the following naming convention: *vizname*_*step*_<mpi-rank>.vtu, and a single file manifest with naming convention: *vizname*_*step*.pvtu. Users are advised to visualize the data using _Paraview_, _VisIt_, or other VTU-compatible visualization software by opening the PVTU files. .. note:: This is a collective routine and must be called by all MPI ranks. Parameters ---------- visualizer: A :class:`meshmode.discretization.visualization.Visualizer` VTK output object. io_fields: List of tuples indicating the (name, data) for each field to write. vizname: str Root part of the visualization file name to write step: int The step number to use in the file names t: float The simulation time to write into the visualization files overwrite: bool Option whether to overwrite existing files (True) or fail if files exist (False=default). comm: An MPI Communicator is required for parallel writes. If no mpi_communicator is provided, then the write is assumed to be serial. (deprecated behavior: pull an MPI communicator from the discretization collection. This will stop working in Fall 2022.) """ from contextlib import nullcontext from import make_par_fname, make_rank_fname if comm is None: # None is OK for serial writes! comm = dcoll.mpi_communicator if comm is not None: # It's *not* OK to get comm from dcoll from warnings import warn warn("Using `write_visfile` in parallel without an MPI communicator is " "deprecated and will stop working in Fall 2022. For parallel " "writes, specify an MPI communicator with the `mpi_communicator` " "argument.") rank = 0 if comm is not None: rank = comm.Get_rank() rank_fn = make_rank_fname(basename=vizname, rank=rank, step=step, t=t) if rank == 0: import os viz_dir = os.path.dirname(rank_fn) if viz_dir and not os.path.exists(viz_dir): os.makedirs(viz_dir) if comm is not None: comm.barrier() if vis_timer: ctm = vis_timer.start_sub_timer() else: ctm = nullcontext() with ctm: visualizer.write_parallel_vtk_file( comm, rank_fn, io_fields, overwrite=overwrite, par_manifest_filename=make_par_fname( basename=vizname, step=step, t=t ) )
[docs] def global_reduce(local_values, op, *, comm=None): """Perform a global reduction (allreduce if MPI comm is provided). This routine is a convenience wrapper for the MPI AllReduce operation that also works outside of an MPI context. .. note:: This is a collective routine and must be called by all MPI ranks. Parameters ---------- local_values: The (:mod:`mpi4py`-compatible) value or array of values on which the reduction operation is to be performed. op: str Reduction operation to be performed. Must be one of "min", "max", "sum", "prod", "lor", or "land". comm: Optional parameter specifying the MPI communicator on which the reduction operation (if any) is to be performed Returns ------- Any ( like *local_values* ) Returns the result of the reduction operation on *local_values* """ if comm is not None: from mpi4py import MPI op_to_mpi_op = { "min": MPI.MIN, "max": MPI.MAX, "sum": MPI.SUM, "prod": MPI.PROD, "lor": MPI.LOR, "land": MPI.LAND, } return comm.allreduce(local_values, op=op_to_mpi_op[op]) else: if np.ndim(local_values) == 0: return local_values else: op_to_numpy_func = { "min": np.minimum, "max": np.maximum, "sum": np.add, "prod": np.multiply, "lor": np.logical_or, "land": np.logical_and, } from functools import reduce return reduce(op_to_numpy_func[op], local_values)
def allsync(local_values, comm=None, op=None): """ Perform allreduce if MPI comm is provided. Deprecated. Do not use in new code. """ from warnings import warn warn("allsync is deprecated and will disappear in Q1 2022. " "Use global_reduce instead.", DeprecationWarning, stacklevel=2) if comm is None: return local_values from mpi4py import MPI if op is None: op = MPI.MAX if op == MPI.MIN: op_string = "min" elif op == MPI.MAX: op_string = "max" elif op == MPI.SUM: op_string = "sum" elif op == MPI.PROD: op_string = "prod" elif op == MPI.LOR: op_string = "lor" elif op == MPI.LAND: op_string = "land" else: raise ValueError(f"Unrecognized MPI reduce op {op}.") return global_reduce(local_values, op_string, comm=comm)
[docs] def check_range_local(dcoll: DiscretizationCollection, dd: str, field: DOFArray, min_value: float, max_value: float) -> List[float]: """Return the values that are outside the range [min_value, max_value].""" actx = field.array_context local_min = actx.to_numpy(op.nodal_min_loc(dcoll, dd, field)).item() local_max = actx.to_numpy(op.nodal_max_loc(dcoll, dd, field)).item() failing_values = [] if local_min < min_value: failing_values.append(local_min) if local_max > max_value: failing_values.append(local_max) return failing_values
[docs] def check_naninf_local(dcoll: DiscretizationCollection, dd: str, field: DOFArray) -> bool: """Return True if there are any NaNs or Infs in the field.""" actx = field.array_context s = actx.to_numpy(op.nodal_sum_loc(dcoll, dd, field)) return not np.isfinite(s)
[docs] def compare_fluid_solutions(dcoll, red_state, blue_state, *, dd=DD_VOLUME_ALL): """Return inf norm of (*red_state* - *blue_state*) for each component. .. note:: This is a collective routine and must be called by all MPI ranks. """ # added tag_axes calls to eliminate fallback warnings at compile time actx = red_state.array_context resid = tag_axes(actx, { 0: DiscretizationElementAxisTag(), 1: DiscretizationDOFAxisTag() }, red_state - blue_state) resid_errs = actx.to_numpy( tag_axes(actx, { 0: DiscretizationElementAxisTag() }, flatten( componentwise_norms(dcoll, resid, order=np.inf, dd=dd), actx) ) ) return resid_errs.tolist()
[docs] def componentwise_norms(dcoll, fields, order=np.inf, *, dd=DD_VOLUME_ALL): """Return the *order*-norm for each component of *fields*. .. note:: This is a collective routine and must be called by all MPI ranks. """ if not isinstance(fields, DOFArray): return map_array_container( partial(componentwise_norms, dcoll, order=order, dd=dd), fields) if len(fields) > 0: return op.norm(dcoll, fields, order, dd=dd) else: # FIXME: This work-around for #575 can go away after #569 return 0
[docs] def max_component_norm(dcoll, fields, order=np.inf, *, dd=DD_VOLUME_ALL): """Return the max *order*-norm over the components of *fields*. .. note:: This is a collective routine and must be called by all MPI ranks. """ actx = fields.array_context return max(actx.to_numpy(flatten( componentwise_norms(dcoll, fields, order, dd=dd), actx)))
class PartitioningError(Exception): """Error tossed to indicate an error with domain decomposition.""" pass
[docs] def geometric_mesh_partitioner(mesh, num_ranks=None, *, nranks_per_axis=None, auto_balance=False, imbalance_tolerance=.01, debug=False): """Partition a mesh uniformly along the X coordinate axis. The intent is to partition the mesh uniformly along user-specified directions. In this current interation, the capability is demonstrated by splitting along the X axis. Parameters ---------- mesh: :class:`meshmode.mesh.Mesh` The serial mesh to partition num_ranks: int The number of partitions to make (deprecated) nranks_per_axis: numpy.ndarray How many partitions per specified axis. auto_balance: bool Indicates whether to perform automatic balancing. If true, the partitioner will try to balance the number of elements over the partitions. imbalance_tolerance: float If *auto_balance* is True, this parameter indicates the acceptable relative difference to the average number of elements per partition. It defaults to balance within 1%. debug: bool En/disable debugging/diagnostic print reporting. Returns ------- elem_to_rank: numpy.ndarray Array indicating the MPI rank for each element """ mesh_dimension = mesh.dim if nranks_per_axis is None or num_ranks is not None: from warnings import warn warn("num_ranks is deprecated, use nranks_per_axis instead.") num_ranks = num_ranks or 1 nranks_per_axis = np.ones(mesh_dimension, dtype=np.int32) nranks_per_axis[0] = num_ranks if len(nranks_per_axis) != mesh_dimension: raise ValueError("nranks_per_axis must match mesh dimension.") num_ranks = if[1:]) != 1: raise NotImplementedError("geometric_mesh_partitioner currently only " "supports partitioning in the X-dimension." "(only nranks_per_axis[0] should be > 1).") mesh_verts = mesh.vertices mesh_x = mesh_verts[0] x_min = np.min(mesh_x) x_max = np.max(mesh_x) x_interval = x_max - x_min part_loc = np.linspace(x_min, x_max, num_ranks+1) part_interval = x_interval / nranks_per_axis[0] all_elem_group_centroids = [] for group in mesh.groups: elem_group_x = mesh_verts[0, group.vertex_indices] elem_group_centroids = np.sum(elem_group_x, axis=1)/elem_group_x.shape[1] all_elem_group_centroids.append(elem_group_centroids) elem_centroids = np.concatenate(all_elem_group_centroids) global_nelements = len(elem_centroids) aver_part_nelem = global_nelements / num_ranks if debug: print(f"Partitioning {global_nelements} elements in" f" [{x_min},{x_max}]/{num_ranks}") print(f"Average nelem/part: {aver_part_nelem}") print(f"Initial part locs: {part_loc=}") # Create geometrically even partitions elem_to_rank = ((elem_centroids-x_min) / part_interval).astype(int) # map partition id to list of elements in that partition part_to_elements = {r: set(np.where(elem_to_rank == r)[0]) for r in range(num_ranks)} # make an array of the geometrically even partition sizes # avoids calling "len" over and over on the element sets nelem_part = [len(part_to_elements[r]) for r in range(num_ranks)] if debug: print(f"Initial: {nelem_part=}") # Automatic load-balancing if auto_balance: for r in range(num_ranks-1): num_elem_needed = aver_part_nelem - nelem_part[r] part_imbalance = np.abs(num_elem_needed) / float(aver_part_nelem) if debug: print(f"Processing part({r=})") print(f"{part_loc[r]=}") print(f"{num_elem_needed=}, {part_imbalance=}") print(f"{nelem_part=}") niter = 0 total_change = 0 moved_elements = set() adv_part = r + 1 while part_imbalance > imbalance_tolerance: # This partition needs to keep changing in size until it meets the # specified imbalance tolerance, or gives up trying # seek out the element reservoir while nelem_part[adv_part] == 0: adv_part = adv_part + 1 if adv_part >= num_ranks: raise PartitioningError("Ran out of elements to partition.") if debug: print(f"-{nelem_part[r]=}, adv_part({adv_part})," f" {nelem_part[adv_part]=}") print(f"-{part_loc[r+1]=},{part_loc[adv_part+1]=}") print(f"-{num_elem_needed=},{part_imbalance=}") if niter > 100: raise PartitioningError("Detected too many iterations in" " partitioning.") # The purpose of the next block is to populate the "moved_elements" # data structure. Then those elements will be moved between the # current partition being processed and the "reservoir," # *and* to adjust the position of the "right" side of the current # partition boundary. moved_elements = set() num_elements_added = 0 if num_elem_needed > 0: # Partition is SMALLER than it should be, grab elements from # the reservoir if debug: print(f"-Grabbing elements from reservoir({adv_part})" f", {nelem_part[adv_part]=}") portion_needed = (float(abs(num_elem_needed)) / float(nelem_part[adv_part])) portion_needed = min(portion_needed, 1.0) if debug: print(f"--Chomping {portion_needed*100}% of" f" reservoir({adv_part}) [by nelem].") if portion_needed == 1.0: # Chomp new_loc = part_loc[adv_part+1] moved_elements.update(part_to_elements[adv_part]) else: # Bite # This is the spatial size of the reservoir reserv_interval = part_loc[adv_part+1] - part_loc[r+1] # Find what portion of the reservoir to grab spatially # This part is needed because the elements are not # distributed uniformly in space. fine_tuned = False trial_portion_needed = portion_needed while not fine_tuned: pos_update = trial_portion_needed*reserv_interval new_loc = part_loc[r+1] + pos_update moved_elements = set() num_elem_mv = 0 for e in part_to_elements[adv_part]: if elem_centroids[e] <= new_loc: moved_elements.add(e) num_elem_mv = num_elem_mv + 1 if num_elem_mv < num_elem_needed: fine_tuned = True else: ovrsht = (num_elem_mv - num_elem_needed) rel_ovrsht = ovrsht/float(num_elem_needed) if rel_ovrsht > 0.8: # bisect the space grabbed and try again trial_portion_needed = trial_portion_needed/2.0 else: fine_tuned = True portion_needed = trial_portion_needed new_loc = part_loc[r+1] + pos_update if debug: print(f"--Tuned: {portion_needed=} [by spatial volume]") print(f"--Advancing part({r}) by +{pos_update}") num_elements_added = len(moved_elements) if debug: print(f"--Adding {num_elements_added} to part({r}).") else: # Partition is LARGER than it should be # Grab the spatial size of the current partition # to estimate the portion we need to shave off # assuming uniform element density part_interval = part_loc[r+1] - part_loc[r] num_to_move = -num_elem_needed portion_needed = num_to_move/float(nelem_part[r]) if debug: print(f"--Shaving off {portion_needed*100}% of" f" partition({r}) [by nelem].") # Tune the shaved portion to account for # non-uniform element density fine_tuned = False while not fine_tuned: pos_update = portion_needed*part_interval new_pos = part_loc[r+1] - pos_update moved_elements = set() num_elem_mv = 0 for e in part_to_elements[r]: if elem_centroids[e] > new_pos: moved_elements.add(e) num_elem_mv = num_elem_mv + 1 if num_elem_mv < num_to_move: fine_tuned = True else: ovrsht = (num_elem_mv - num_to_move) rel_ovrsht = ovrsht/float(num_to_move) if rel_ovrsht > 0.8: # bisect and try again portion_needed = portion_needed/2.0 else: fine_tuned = True # new "right" wall location of shranken part # and negative num_elements_added for removal new_loc = new_pos num_elements_added = -len(moved_elements) if debug: print(f"--Reducing partition size by {portion_needed*100}%" " [by nelem].") print(f"--Removing {-num_elements_added} from part({r}).") # Now "moved_elements", "num_elements_added", and "new_loc" # are computed. Update the partition, and reservoir. if debug: print(f"--Number of elements to ADD: {num_elements_added}.") if num_elements_added > 0: part_to_elements[r].update(moved_elements) part_to_elements[adv_part].difference_update( moved_elements) for e in moved_elements: elem_to_rank[e] = r else: part_to_elements[r].difference_update(moved_elements) part_to_elements[adv_part].update(moved_elements) for e in moved_elements: elem_to_rank[e] = adv_part total_change = total_change + num_elements_added part_loc[r+1] = new_loc if debug: print(f"--Before: {nelem_part=}") nelem_part[r] = nelem_part[r] + num_elements_added nelem_part[adv_part] = nelem_part[adv_part] - num_elements_added if debug: print(f"--After: {nelem_part=}") # Compute new nelem_needed and part_imbalance num_elem_needed = num_elem_needed - num_elements_added part_imbalance = \ np.abs(num_elem_needed) / float(aver_part_nelem) niter = niter + 1 # Summarize the total change and state of the partition # and reservoir if debug: print(f"-Part({r}): {total_change=}") print(f"-Part({r=}): {nelem_part[r]=}, {part_imbalance=}") print(f"-Part({adv_part}): {nelem_part[adv_part]=}") # Validate the partitioning before returning total_partitioned_elements = sum([len(part_to_elements[r]) for r in range(num_ranks)]) total_nelem_part = sum([nelem_part[r] for r in range(num_ranks)]) if debug: print("Validating mesh parts.") if total_partitioned_elements != total_nelem_part: raise PartitioningError("Validator: parted element counts dont match") if total_partitioned_elements != global_nelements: raise PartitioningError("Validator: global element counts dont match.") if len(elem_to_rank) != global_nelements: raise PartitioningError("Validator: elem-to-rank wrong size.") if np.any(nelem_part) <= 0: raise PartitioningError("Validator: empty partitions.") for e in range(global_nelements): part = elem_to_rank[e] if e not in part_to_elements[part]: raise PartitioningError("Validator: part/element/part map mismatch.") part_counts = np.zeros(global_nelements) for part_elements in part_to_elements.values(): for element in part_elements: part_counts[element] = part_counts[element] + 1 if np.any(part_counts > 1): raise PartitioningError("Validator: degenerate elements") if np.any(part_counts < 1): raise PartitioningError("Validator: orphaned elements") return elem_to_rank
def generate_and_distribute_mesh(comm, generate_mesh, **kwargs): """Generate a mesh and distribute it among all ranks in *comm*. Generate the mesh with the user-supplied mesh generation function *generate_mesh*, partition the mesh, and distribute it to every rank in the provided MPI communicator *comm*. .. note:: This is a collective routine and must be called by all MPI ranks. Parameters ---------- comm: MPI communicator over which to partition the mesh generate_mesh: Callable of zero arguments returning a :class:`meshmode.mesh.Mesh`. Will only be called on one (undetermined) rank. Returns ------- local_mesh : :class:`meshmode.mesh.Mesh` The local partition of the the mesh returned by *generate_mesh*. global_nelements : :class:`int` The number of elements in the serial mesh """ from warnings import warn warn( "generate_and_distribute_mesh is deprecated and will go away Q4 2022. " "Use distribute_mesh instead.", DeprecationWarning, stacklevel=2) return distribute_mesh(comm, generate_mesh, **kwargs)
[docs] def distribute_mesh(comm, get_mesh_data, partition_generator_func=None, logmgr=None): r"""Distribute a mesh among all ranks in *comm*. Retrieve the global mesh data with the user-supplied function *get_mesh_data*, partition the mesh, and distribute it to every rank in the provided MPI communicator *comm*. .. note:: This is a collective routine and must be called by all MPI ranks. Parameters ---------- comm: MPI communicator over which to partition the mesh get_mesh_data: Callable of zero arguments returning *mesh* or *(mesh, tag_to_elements, volume_to_tags)*, where *mesh* is a :class:`meshmode.mesh.Mesh`, *tag_to_elements* is a :class:`dict` mapping mesh volume tags to :class:`numpy.ndarray`\ s of element numbers, and *volume_to_tags* is a :class:`dict` that maps volumes in the resulting distributed mesh to volume tags in *tag_to_elements*. partition_generator_func: Optional callable that takes *mesh*, *tag_to_elements*, and *comm*'s size, and returns a :class:`numpy.ndarray` indicating to which rank each element belongs. Returns ------- local_mesh_data: :class:`meshmode.mesh.Mesh` or :class:`dict` If the result of calling *get_mesh_data* specifies a single volume, *local_mesh_data* is the local mesh. If it specifies multiple volumes, *local_mesh_data* will be a :class:`dict` mapping volume tags to tuples of the form *(local_mesh, local_tag_to_elements)*. global_nelements: :class:`int` The number of elements in the global mesh """ from mpi4py.util import pkl5 comm_wrapper = pkl5.Intracomm(comm) num_ranks = comm_wrapper.Get_size() t_mesh_dist = IntervalTimer("t_mesh_dist", "Time spent distributing mesh data.") t_mesh_data = IntervalTimer("t_mesh_data", "Time spent getting mesh data.") t_mesh_part = IntervalTimer("t_mesh_part", "Time spent partitioning the mesh.") t_mesh_split = IntervalTimer("t_mesh_split", "Time spent splitting mesh parts.") if partition_generator_func is None: def partition_generator_func(mesh, tag_to_elements, num_ranks): from meshmode.distributed import get_partition_by_pymetis return get_partition_by_pymetis(mesh, num_ranks) if comm_wrapper.Get_rank() == 0: if logmgr: logmgr.add_quantity(t_mesh_data) with t_mesh_data.get_sub_timer(): global_data = get_mesh_data() else: global_data = get_mesh_data() from meshmode.mesh import Mesh if isinstance(global_data, Mesh): mesh = global_data tag_to_elements = None volume_to_tags = None elif isinstance(global_data, tuple) and len(global_data) == 3: mesh, tag_to_elements, volume_to_tags = global_data else: raise TypeError("Unexpected result from get_mesh_data") if logmgr: logmgr.add_quantity(t_mesh_part) with t_mesh_part.get_sub_timer(): rank_per_element = partition_generator_func(mesh, tag_to_elements, num_ranks) else: rank_per_element = partition_generator_func(mesh, tag_to_elements, num_ranks) def get_rank_to_mesh_data(): from meshmode.mesh.processing import partition_mesh if tag_to_elements is None: rank_to_elements = { rank: np.where(rank_per_element == rank)[0] for rank in range(num_ranks)} rank_to_mesh_data_dict = partition_mesh(mesh, rank_to_elements) rank_to_mesh_data = [ rank_to_mesh_data_dict[rank] for rank in range(num_ranks)] else: tag_to_volume = { tag: vol for vol, tags in volume_to_tags.items() for tag in tags} volumes = list(volume_to_tags.keys()) volume_index_per_element = np.full(mesh.nelements, -1, dtype=int) for tag, elements in tag_to_elements.items(): volume_index_per_element[elements] = volumes.index( tag_to_volume[tag]) if np.any(volume_index_per_element < 0): raise ValueError("Missing volume specification " "for some elements.") part_id_to_elements = { PartID(volumes[vol_idx], rank): np.where( (volume_index_per_element == vol_idx) & (rank_per_element == rank))[0] for vol_idx in range(len(volumes)) for rank in range(num_ranks)} # TODO: Add a public meshmode function to accomplish this? So we're # not depending on meshmode internals part_id_to_part_index = { part_id: part_index for part_index, part_id in enumerate(part_id_to_elements.keys())} from meshmode.mesh.processing import \ _compute_global_elem_to_part_elem global_elem_to_part_elem = _compute_global_elem_to_part_elem( mesh.nelements, part_id_to_elements, part_id_to_part_index, mesh.element_id_dtype) tag_to_global_to_part = { tag: global_elem_to_part_elem[elements, :] for tag, elements in tag_to_elements.items()} part_id_to_tag_to_elements = {} for part_id in part_id_to_elements.keys(): part_idx = part_id_to_part_index[part_id] part_tag_to_elements = {} for tag, global_to_part in tag_to_global_to_part.items(): part_tag_to_elements[tag] = global_to_part[ global_to_part[:, 0] == part_idx, 1] part_id_to_tag_to_elements[part_id] = part_tag_to_elements part_id_to_mesh = partition_mesh(mesh, part_id_to_elements) rank_to_mesh_data = [ { vol: ( part_id_to_mesh[PartID(vol, rank)], part_id_to_tag_to_elements[PartID(vol, rank)]) for vol in volumes} for rank in range(num_ranks)] return rank_to_mesh_data if logmgr: logmgr.add_quantity(t_mesh_split) with t_mesh_split.get_sub_timer(): rank_to_mesh_data = get_rank_to_mesh_data() else: rank_to_mesh_data = get_rank_to_mesh_data() global_nelements = comm_wrapper.bcast(mesh.nelements, root=0) if logmgr: logmgr.add_quantity(t_mesh_dist) with t_mesh_dist.get_sub_timer(): local_mesh_data = comm_wrapper.scatter(rank_to_mesh_data, root=0) else: local_mesh_data = comm_wrapper.scatter(rank_to_mesh_data, root=0) else: global_nelements = comm_wrapper.bcast(None, root=0) if logmgr: logmgr.add_quantity(t_mesh_dist) with t_mesh_dist.get_sub_timer(): local_mesh_data = comm_wrapper.scatter(None, root=0) else: local_mesh_data = comm_wrapper.scatter(None, root=0) return local_mesh_data, global_nelements
def extract_volumes(mesh, tag_to_elements, selected_tags, boundary_tag): r""" Create a mesh containing a subset of another mesh's volumes. Parameters ---------- mesh: :class:`meshmode.mesh.Mesh` The original mesh. tag_to_elements: A :class:`dict` mapping mesh volume tags to :class:`numpy.ndarray`\ s of element numbers in *mesh*. selected_tags: A sequence of tags in *tag_to_elements* representing the subset of volumes to be included. boundary_tag: Tag to assign to the boundary that was previously the interface between included/excluded volumes. Returns ------- in_mesh: :class:`meshmode.mesh.Mesh` The resulting mesh. tag_to_in_elements: A :class:`dict` mapping the tags from *selected_tags* to :class:`numpy.ndarray`\ s of element numbers in *in_mesh*. """ is_in_element = np.full(mesh.nelements, False) for tag, elements in tag_to_elements.items(): if tag in selected_tags: is_in_element[elements] = True from meshmode.mesh.processing import partition_mesh in_mesh = partition_mesh(mesh, { "_in": np.where(is_in_element)[0], "_out": np.where(~is_in_element)[0]})["_in"] # partition_mesh creates a partition boundary for "_out"; replace with a # normal boundary new_facial_adjacency_groups = [] from meshmode.mesh import BoundaryAdjacencyGroup, InterPartAdjacencyGroup for grp_list in in_mesh.facial_adjacency_groups: new_grp_list = [] for fagrp in grp_list: if ( isinstance(fagrp, InterPartAdjacencyGroup) and fagrp.part_id == "_out"): new_fagrp = BoundaryAdjacencyGroup( igroup=fagrp.igroup, boundary_tag=boundary_tag, elements=fagrp.elements, element_faces=fagrp.element_faces) else: new_fagrp = fagrp new_grp_list.append(new_fagrp) new_facial_adjacency_groups.append(new_grp_list) in_mesh = in_mesh.copy(facial_adjacency_groups=new_facial_adjacency_groups) element_to_in_element = np.where( is_in_element, np.cumsum(is_in_element) - 1, np.full(mesh.nelements, -1)) tag_to_in_elements = { tag: element_to_in_element[tag_to_elements[tag]] for tag in selected_tags} return in_mesh, tag_to_in_elements
[docs] def boundary_report(dcoll, boundaries, outfile_name, *, dd=DD_VOLUME_ALL, mesh=None): """Generate a report of the grid boundaries.""" boundaries = normalize_boundaries(boundaries) comm = dcoll.mpi_communicator nproc = 1 rank = 0 if comm is not None: nproc = comm.Get_size() rank = comm.Get_rank() if mesh is not None: nelem = 0 for grp in mesh.groups: nelem = nelem + grp.nelements local_header = f"nproc: {nproc}\nrank: {rank}\nnelem: {nelem}\n" else: local_header = f"nproc: {nproc}\nrank: {rank}\n" from io import StringIO local_report = StringIO(local_header), 2) for bdtag in boundaries: boundary_discr = dcoll.discr_from_dd(bdtag) nnodes = sum([grp.ndofs for grp in boundary_discr.groups]) local_report.write(f"{bdtag}: {nnodes}\n") from meshmode.distributed import get_connected_parts from meshmode.mesh import BTAG_PARTITION connected_part_ids = get_connected_parts(dcoll.discr_from_dd(dd).mesh) local_report.write(f"num_nbr_parts: {len(connected_part_ids)}\n") local_report.write(f"connected_part_ids: {connected_part_ids}\n") part_nodes = [] for connected_part_id in connected_part_ids: boundary_discr = dcoll.discr_from_dd( dd.trace(BTAG_PARTITION(connected_part_id))) nnodes = sum([grp.ndofs for grp in boundary_discr.groups]) part_nodes.append(nnodes) if part_nodes: local_report.write(f"nnodes_pb: {part_nodes}\n") local_report.write("-----\n") for irank in range(nproc): if irank == rank: f = open(outfile_name, "a+") f.write( f.close() if comm is not None: comm.barrier()
def force_evaluation(actx, expn): """Wrap freeze/thaw forcing evaluation of expressions. Deprecated; use :func:`mirgecom.utils.force_evaluation` instead. """ from warnings import warn warn("simutil.force_evaluation is deprecated and will disappear in Q3 2023. " "Use utils.force_evaluation instead.", DeprecationWarning, stacklevel=2) return actx.thaw(actx.freeze(expn))
[docs] def get_reasonable_memory_pool(ctx: cl.Context, queue: cl.CommandQueue, force_buffer: bool = False, force_non_pool: bool = False): """Return an SVM or buffer memory pool based on what the device supports. By default, it prefers SVM allocations over CL buffers, and memory pools over direct allocations. """ import as cl_tools from pyopencl.characterize import has_coarse_grain_buffer_svm if force_buffer and force_non_pool:"Using non-pooled CL buffer allocations on {queue.device}.") return cl_tools.DeferredAllocator(ctx) if force_buffer:"Using pooled CL buffer allocations on {queue.device}.") return cl_tools.MemoryPool(cl_tools.ImmediateAllocator(queue)) if force_non_pool and has_coarse_grain_buffer_svm(queue.device):"Using non-pooled SVM allocations on {queue.device}.") return cl_tools.SVMAllocator( # pylint: disable=no-member ctx, alignment=0, queue=queue) if has_coarse_grain_buffer_svm(queue.device) and hasattr(cl_tools, "SVMPool"):"Using SVM-based memory pool on {queue.device}.") return cl_tools.SVMPool(cl_tools.SVMAllocator( # pylint: disable=no-member ctx, alignment=0, queue=queue)) else: from warnings import warn if not has_coarse_grain_buffer_svm(queue.device): warn(f"No SVM support on {queue.device}, returning a CL buffer-based " "memory pool. If you are running with PoCL-cuda, please update " "your PoCL installation.") else: warn("No SVM memory pool support with your version of PyOpenCL, " f"returning a CL buffer-based memory pool on {queue.device}. " "Please update your PyOpenCL version.") return cl_tools.MemoryPool(cl_tools.ImmediateAllocator(queue))
[docs] def configurate(config_key, config_object=None, default_value=None): """Return a configured item from a configuration object.""" if config_object is not None: d = config_object if isinstance(config_object, dict) else\ config_object.__dict__ if config_key in d: value = d[config_key] if default_value is not None: return type(default_value)(value) return value return default_value
[docs] def compare_files_vtu( first_file: str, second_file: str, file_type: str, tolerance: float = 1e-12, field_tolerance: Optional[Dict[str, float]] = None ) -> None: """Compare files of vtu type. Parameters ---------- first_file: First file to compare second_file: Second file to compare file_type: Vtu files tolerance: Max acceptable absolute difference field_tolerance: Dictionary of individual field tolerances Returns ------- True: If it passes the files contain data within the given tolerance. False: If it fails the files contain data outside the given tolerance. """ import xml.etree.ElementTree as Et import vtk # read files: if file_type == "vtu": reader1 = vtk.vtkXMLUnstructuredGridReader() # pylint: disable=no-member reader2 = vtk.vtkXMLUnstructuredGridReader() # pylint: disable=no-member else: reader1 = vtk.vtkXMLPUnstructuredGridReader() # pylint: disable=no-member reader2 = vtk.vtkXMLPUnstructuredGridReader() # pylint: disable=no-member reader1.SetFileName(first_file) reader1.Update() output1 = reader1.GetOutput() # Check rank number def numranks(filename: str) -> int: tree = Et.parse(filename) root = tree.getroot() return len(root.findall(".//Piece")) if file_type == "pvtu": rank1 = numranks(first_file) rank2 = numranks(second_file) if rank1 != rank2: raise ValueError(f"File '{first_file}' has {rank1} ranks, " f"but file '{second_file}' has {rank2} ranks.") reader2.SetFileName(second_file) reader2.Update() output2 = reader2.GetOutput() # check fidelity point_data1 = output1.GetPointData() point_data2 = output2.GetPointData() # verify same number of PointData arrays in both files if point_data1.GetNumberOfArrays() != point_data2.GetNumberOfArrays(): print("File 1:", point_data1.GetNumberOfArrays(), "\n", "File 2:", point_data2.GetNumberOfArrays()) raise ValueError("Fidelity test failed: Mismatched data array count") nfields = point_data1.GetNumberOfArrays() max_field_errors = [0 for _ in range(nfields)] if field_tolerance is None: field_tolerance = {} field_specific_tols = [configurate(point_data1.GetArrayName(i), field_tolerance, tolerance) for i in range(nfields)] field_names = [] max_file_diff = 0. max_field_name = "" for i in range(nfields): arr1 = point_data1.GetArray(i) arr2 = point_data2.GetArray(i) field_tol = field_specific_tols[i] num_points = arr2.GetNumberOfTuples() num_components = arr1.GetNumberOfComponents() # verify both files contain same arrays if point_data1.GetArrayName(i) != point_data2.GetArrayName(i): print("File 1:", point_data1.GetArrayName(i), "\n", "File 2:", point_data2.GetArrayName(i)) raise ValueError("Fidelity test failed: Mismatched data array names") # verify arrays are same sizes in both files if arr1.GetSize() != arr2.GetSize(): print("File 1, DataArray", i, ":", arr1.GetSize(), "\n", "File 2, DataArray", i, ":", arr2.GetSize()) raise ValueError("Fidelity test failed: Mismatched data array sizes") # verify individual values w/in given tolerance fieldname = point_data1.GetArrayName(i) field_names.append(fieldname) # Ignore any fields that are named here # FIXME: rhs, grad are here because they fail thermally-coupled example ignored_fields = ["resid", "tagged", "grad", "rhs"] ignore_field = False for ifld in ignored_fields: if ifld in fieldname: ignore_field = True if ignore_field: continue max_true_component = max([max(abs(arr2.GetComponent(j, icomp)) for j in range(num_points)) for icomp in range(num_components)]) max_other_component = max([max(abs(arr1.GetComponent(j, icomp)) for j in range(num_points)) for icomp in range(num_components)]) # FIXME # Choose an arbitrary "level" to define what is a "significant" field # Don't compare dinky/insignificant fields significant_field = 1000.*field_tol if ((max_true_component < significant_field) and (max_other_component < significant_field)): continue max_true_value = max_true_component use_relative_diff = max_true_value > field_tol err_scale = 1./max_true_value if use_relative_diff else 1. print(f"{fieldname}: Max true value: {max_true_value}") print(f"{fieldname}: Error scale: {err_scale}") # Spew out some stats on each field component print(f"Field({fieldname}) Component (min, max, mean)s:") for icomp in range(num_components): min_true_value = min(arr2.GetComponent(j, icomp) for j in range(num_points)) min_other_value = min(arr1.GetComponent(j, icomp) for j in range(num_points)) max_true_value = max(arr2.GetComponent(j, icomp) for j in range(num_points)) max_other_value = max(arr1.GetComponent(j, icomp) for j in range(num_points)) true_mean = sum(arr2.GetComponent(j, icomp) for j in range(num_points)) / num_points other_mean = sum(arr1.GetComponent(j, icomp) for j in range(num_points)) / num_points print(f"{fieldname}({icomp}): ({min_other_value}," f" {max_other_value}, {other_mean})") print(f"{fieldname}({icomp}): ({min_true_value}," f" {max_true_value}, {true_mean})") print(f"Field({fieldname}) comparison:") for icomp in range(num_components): if num_components > 1: print(f"Comparing component {icomp}") for j in range(num_points): test_err = abs(arr1.GetComponent(j, icomp) - arr2.GetComponent(j, icomp))*err_scale if test_err > max_field_errors[i]: max_field_errors[i] = test_err print(f"Max Error: {max_field_errors[i]}", end=" ") print(f"Tolerance: {field_specific_tols[i]}") violated_tols = [] violating_values = [] violating_fields = [] for i in range(nfields): if max_field_errors[i] > max_file_diff: max_file_diff = max_field_errors[i] max_field_name = field_names[i] if max_field_errors[i] > field_specific_tols[i]: violated_tols.append(field_specific_tols[i]) violating_values.append(max_field_errors[i]) violating_fields.append(field_names[i]) print(f"Max File Difference: {max_field_name} : {max_file_diff}") if violated_tols: raise ValueError("Data comparison failed:\n" f"Fields: {violating_fields=}\n" f"Max differences: {violating_values=}\n" f"Tolerances: {violated_tols=}\n") print("VTU Fidelity test completed successfully")
class _Hdf5Reader: def __init__(self, filename): import h5py self.file_obj = h5py.File(filename, "r") def read_specific_data(self, datapath): return self.file_obj[datapath] class _XdmfReader: # CURRENTLY DOES NOT SUPPORT MULTIPLE Grids def __init__(self, filename): from xml.etree import ElementTree tree = ElementTree.parse(filename) root = tree.getroot() domains = tuple(root) self.domain = domains[0] self.grids = tuple(self.domain) self.uniform_grid = self.grids[0] def get_topology(self): connectivity = None for a in self.uniform_grid: if a.tag == "Topology": connectivity = a if connectivity is None: raise ValueError("File is missing grid connectivity data") return connectivity def get_geometry(self): geometry = None for a in self.uniform_grid: if a.tag == "Geometry": geometry = a if geometry is None: raise ValueError("File is missing grid node location data") return geometry def read_data_item(self, data_item): # CURRENTLY DOES NOT SUPPORT 'DataItem' THAT STORES VALUES DIRECTLY # check that data stored as separate hdf5 file if data_item.get("Format") != "HDF": raise TypeError("Data stored in unrecognized format") # get corresponding hdf5 file source_info = data_item.text split_source_info = source_info.partition(":") h5_filename = split_source_info[0] # TODO: change file name to match actual mirgecom output directory later h5_filename = "examples/" + h5_filename h5_datapath = split_source_info[2] # read data from corresponding hdf5 file h5_reader = _Hdf5Reader(h5_filename) return h5_reader.read_specific_data(h5_datapath)
[docs] def compare_files_xdmf(first_file: str, second_file: str, tolerance: float = 1e-12): """Compare files of xdmf type. Parameters ---------- first_file: First file to compare second_file: Second file to compare file_type: Xdmf files tolerance: Max acceptable absolute difference Returns ------- True: If it passes the file type test or contains same data. False: If it fails the file type test or contains different data. """ # read files file_reader1 = _XdmfReader(first_file) file_reader2 = _XdmfReader(second_file) # check same number of grids if len(file_reader1.grids) != len(file_reader2.grids): print("File 1:", len(file_reader1.grids), "\n", "File 2:", len(file_reader2.grids)) raise ValueError("Fidelity test failed: Mismatched grid count") # check same number of cells in gridTrue: if len(file_reader1.uniform_grid) != len(file_reader2.uniform_grid): print("File 1:", len(file_reader1.uniform_grid), "\n", "File 2:", len(file_reader2.uniform_grid)) raise ValueError("Fidelity test failed: Mismatched cell count in " "uniform grid") # compare Topology: top1 = file_reader1.get_topology() top2 = file_reader2.get_topology() # check TopologyType if top1.get("TopologyType") != top2.get("TopologyType"): print("File 1:", top1.get("TopologyType"), "\n", "File 2:", top2.get("TopologyType")) raise ValueError("Fidelity test failed: Mismatched topology type") # check number of connectivity values connectivities1 = file_reader1.read_data_item(tuple(top1)[0]) connectivities2 = file_reader2.read_data_item(tuple(top2)[0]) connectivities1 = np.array(connectivities1) connectivities2 = np.array(connectivities2) if connectivities1.shape != connectivities2.shape: print("File 1:", connectivities1.shape, "\n", "File 2:", connectivities2.shape) raise ValueError("Fidelity test failed: Mismatched connectivities count") if not np.allclose(connectivities1, connectivities2, atol=tolerance): print("Tolerance:", tolerance) raise ValueError("Fidelity test failed: Mismatched connectivity values " "with given tolerance") # compare Geometry: geo1 = file_reader1.get_geometry() geo2 = file_reader2.get_geometry() # check GeometryType if geo1.get("GeometryType") != geo2.get("GeometryType"): print("File 1:", geo1.get("GeometryType"), "\n", "File 2:", geo2.get("GeometryType")) raise ValueError("Fidelity test failed: Mismatched geometry type") # check number of node values nodes1 = file_reader1.read_data_item(tuple(geo1)[0]) nodes2 = file_reader2.read_data_item(tuple(geo2)[0]) nodes1 = np.array(nodes1) nodes2 = np.array(nodes2) if nodes1.shape != nodes2.shape: print("File 1:", nodes1.shape, "\n", "File 2:", nodes2.shape) raise ValueError("Fidelity test failed: Mismatched nodes count") if not np.allclose(nodes1, nodes2, atol=tolerance): print("Tolerance:", tolerance) raise ValueError("Fidelity test failed: Mismatched node values with " "given tolerance") # compare other Attributes: maxerrorvalue = 0 for i in range(len(file_reader1.uniform_grid)): curr_cell1 = file_reader1.uniform_grid[i] curr_cell2 = file_reader2.uniform_grid[i] # skip already checked cells if curr_cell1.tag == "Geometry" or curr_cell1.tag == "Topology": continue # check AttributeType if curr_cell1.get("AttributeType") != curr_cell2.get("AttributeType"): print("File 1:", curr_cell1.get("AttributeType"), "\n", "File 2:", curr_cell2.get("AttributeType")) raise ValueError("Fidelity test failed: Mismatched cell type") # check Attribtue name if curr_cell1.get("Name") != curr_cell2.get("Name"): print("File 1:", curr_cell1.get("Name"), "\n", "File 2:", curr_cell2.get("Name")) raise ValueError("Fidelity test failed: Mismatched cell name") # check number of Attribute values values1 = file_reader1.read_data_item(tuple(curr_cell1)[0]) values2 = file_reader2.read_data_item(tuple(curr_cell2)[0]) if len(values1) != len(values2): print("File 1,", curr_cell1.get("Name"), ":", len(values1), "\n", "File 2,", curr_cell2.get("Name"), ":", len(values2)) raise ValueError("Fidelity test failed: Mismatched data values count") # check values w/in tolerance for i in range(len(values1)): if abs(values1[i] - values2[i]) > tolerance: print("Tolerance:", tolerance, "\n", "Cell:", curr_cell1.get("Name")) if maxerrorvalue < abs(values1[i] - values2[i]): maxerrorvalue = abs(values1[i] - values2[i]) if not maxerrorvalue == 0: raise ValueError("Fidelity test failed: Mismatched data array " "values with given tolerance. " "Max Error Value:", maxerrorvalue) print("XDMF Fidelity test completed successfully with tolerance", tolerance)
[docs] def compare_files_hdf5(first_file: str, second_file: str, tolerance: float = 1e-12): """Compare files of hdf5 type. Parameters ---------- first_file: First file to compare second_file: Second file to compare file_type: Hdf5 files tolerance: Max acceptable absolute difference Returns ------- True: If it passes the file type test or contains same data. False: If it fails the file type test or contains different data. """ file_reader1 = _Hdf5Reader(first_file) file_reader2 = _Hdf5Reader(second_file) f1 = file_reader1.file_obj f2 = file_reader2.file_obj objects1 = list(f1.keys()) objects2 = list(f2.keys()) # check number of Grids if len(objects1) != len(objects2): print("File 1:", len(objects1), "\n", "File 2:", len(objects2)) raise ValueError("Fidelity test failed: Mismatched grid count") # loop through Grids maxvalueerror = 0 for i in range(len(objects1)): obj_name1 = objects1[i] obj_name2 = objects2[i] if obj_name1 != obj_name2: print("File 1:", obj_name1, "\n", "File 2:", obj_name2) raise ValueError("Fidelity test failed: Mismatched grid names") curr_o1 = list(f1[obj_name1]) curr_o2 = list(f2[obj_name2]) if len(curr_o1) != len(curr_o2): print("File 1,", obj_name1, ":", len(curr_o1), "\n", "File 2,", obj_name2, ":", len(curr_o2)) raise ValueError("Fidelity test failed: Mismatched group count") # loop through Groups for j in range(len(curr_o1)): subobj_name1 = curr_o1[j] subobj_name2 = curr_o2[j] if subobj_name1 != subobj_name2: print("File 1:", subobj_name1, "\n", "File 2:", subobj_name2) raise ValueError("Fidelity test failed: Mismatched group names") subpath1 = obj_name1 + "/" + subobj_name1 subpath2 = obj_name2 + "/" + subobj_name2 data_arrays_list1 = list(f1[subpath1]) data_arrays_list2 = list(f2[subpath2]) if len(data_arrays_list1) != len(data_arrays_list2): print("File 1,", subobj_name1, ":", len(data_arrays_list1), "\n", "File 2,", subobj_name2, ":", len(data_arrays_list2)) raise ValueError("Fidelity test failed: Mismatched data list count") # loop through data arrays for k in range(len(data_arrays_list1)): curr_listname1 = data_arrays_list1[k] curr_listname2 = data_arrays_list2[k] if curr_listname1 != curr_listname2: print("File 1:", curr_listname1, "\n", "File 2:", curr_listname2) raise ValueError("Fidelity test failed: Mismatched data " "list names") curr_listname1 = subpath1 + "/" + curr_listname1 curr_listname2 = subpath2 + "/" + curr_listname2 curr_datalist1 = np.array(list(f1[curr_listname1])) curr_datalist2 = np.array(list(f2[curr_listname2])) if curr_datalist1.shape != curr_datalist2.shape: print("File 1,", curr_listname1, ":", curr_datalist1.shape, "\n", "File 2,", curr_listname2, ":", curr_datalist2.shape) raise ValueError("Fidelity test failed: Mismatched data " "list size") if not np.allclose(curr_datalist1, curr_datalist2, atol=tolerance): print("Tolerance:", tolerance, "\n", "Data List:", curr_listname1) if maxvalueerror < abs(curr_datalist1 - curr_datalist2): maxvalueerror = abs(curr_datalist1 - curr_datalist2) if not maxvalueerror == 0: raise ValueError("Fidelity test failed: Mismatched data " "values with given tolerance. " "Max Value Error: ", maxvalueerror) print("HDF5 Fidelity test completed successfully with tolerance", tolerance)