Source code for mirgecom.mpi

"""MPI helper functionality.

.. autofunction:: mpi_entry_point
.. autofunction:: pudb_remote_debug_on_single_rank
.. autofunction:: enable_rank_labeled_print
"""

__copyright__ = """
Copyright (C) 2020 University of Illinois Board of Trustees
"""

__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""

from functools import wraps
import os
import sys

from contextlib import contextmanager
from typing import Callable, Any, Generator, TYPE_CHECKING

import logging
logger = logging.getLogger(__name__)


if TYPE_CHECKING:
    from mpi4py.MPI import Comm


@contextmanager
def shared_split_comm_world() -> Generator["Comm", None, None]:
    """Create a context manager for a MPI.COMM_TYPE_SHARED comm."""
    from mpi4py import MPI
    comm = MPI.COMM_WORLD.Split_type(MPI.COMM_TYPE_SHARED)

    try:
        yield comm
    finally:
        comm.Free()


def _check_cache_dirs() -> None:
    """Check whether multiple ranks share cache directories on the same node."""
    from mpi4py import MPI

    size = MPI.COMM_WORLD.Get_size()

    if size <= 1:
        return

    with shared_split_comm_world() as node_comm:
        node_rank = node_comm.Get_rank()

        def _check_var(var: str) -> None:
            from warnings import warn

            try:
                my_path = os.environ[var]
            except KeyError:
                warn(f"Please set the '{var}' variable in your job script to "
                    "avoid file system overheads when running on large numbers of "
                    "ranks. See https://mirgecom.readthedocs.io/en/latest/running/large-systems.html "  # noqa: E501
                    "for more information.")
                # Create a fake path so there will not be a second warning below.
                my_path = f"no/such/path/rank{node_rank}"

            all_paths = node_comm.gather(my_path, root=0)

            if node_rank == 0:
                assert all_paths
                if len(all_paths) != len(set(all_paths)):
                    hostname = MPI.Get_processor_name()
                    dup = [path for path in set(all_paths)
                                if all_paths.count(path) > 1]

                    from warnings import warn
                    warn(f"Multiple ranks are sharing '{var}' on node '{hostname}'. "
                        f"Duplicate '{var}'s: {dup}.")

        _check_var("XDG_CACHE_HOME")

        if os.environ.get("XDG_CACHE_HOME") is None:
            # When XDG_CACHE_HOME is set but POCL_CACHE_DIR is not, pocl
            # will use XDG_CACHE_HOME as the cache directory.
            _check_var("POCL_CACHE_DIR")

        # We haven't observed an issue yet that 'CUDA_CACHE_PATH' fixes,
        # so disable this check for now.
        # _check_var("CUDA_CACHE_PATH")


def _check_gpu_oversubscription() -> None:
    """
    Check whether multiple ranks are running on the same GPU on each node.

    Only works with CUDA devices currently due to the use of the
    PCI_DOMAIN_ID_NV extension.
    """
    from mpi4py import MPI
    import pyopencl as cl

    size = MPI.COMM_WORLD.Get_size()

    if size <= 1:
        return

    cl_ctx = cl.create_some_context()
    dev = cl_ctx.devices

    # No support for multi-device contexts
    if len(dev) > 1:
        raise NotImplementedError("multi-device contexts not yet supported")

    dev = dev[0]

    # Allow running multiple ranks on non-GPU devices
    if not (dev.type & cl.device_type.GPU):
        return

    with shared_split_comm_world() as node_comm:
        try:
            domain_id = hex(dev.pci_domain_id_nv)
        except (cl._cl.LogicError, AttributeError):
            from warnings import warn
            warn("Cannot detect whether multiple ranks are running on the"
                 " same GPU because it requires Nvidia GPUs running with"
                 " pyopencl>2021.1.1 and (Nvidia CL or pocl>1.6).")
            return

        node_rank = node_comm.Get_rank()

        bus_id = hex(dev.pci_bus_id_nv)
        slot_id = hex(dev.pci_slot_id_nv)

        dev_id = (domain_id, bus_id, slot_id)

        dev_ids = node_comm.gather(dev_id, root=0)

        if node_rank == 0:
            assert dev_ids
            if len(dev_ids) != len(set(dev_ids)):
                hostname = MPI.Get_processor_name()
                dup = [item for item in dev_ids if dev_ids.count(item) > 1]

                from warnings import warn
                warn(f"Multiple ranks are sharing GPUs on node '{hostname}'. "
                     f"Duplicate PCIe IDs: {dup}.")


def _check_isl_version() -> None:
    """
    Check that we run with a non-GMP ISL version.

    In general, ISL can be built with 3 options, imath-32, GMP, and imath,
    in descending order of speed.
    Since https://github.com/conda-forge/isl-feedstock only offers imath-32
    or GMP, we can check for the presence of GMP-only symbols in the loaded
    library to determine if we are running with imath-32.
    """
    import ctypes
    import islpy  # type: ignore[import-untyped]

    try:
        ctypes.cdll.LoadLibrary(islpy._isl.__file__).isl_val_get_num_gmp
    except AttributeError:
        # We are running with imath or imath-32.
        pass
    else:
        from warnings import warn
        warn("Running with the GMP version of ISL, which is considerably "
             "slower than imath-32. Please install a faster ISL version with "
             "a command such as 'conda install \"isl * imath32_*\"' .")


def _check_mpi4py_version() -> None:
    import mpi4py

    if mpi4py.__version__ < "4":
        from warnings import warn
        warn(f"mpi4py version {mpi4py.__version__} does not support pkl5 "
              "scatter. This may lead to errors when distributing large meshes. "
              "Please upgrade to the git version of mpi4py.")

    else:
        logger.info(f"Using mpi4py version {mpi4py.__version__} with pkl5 "
                     "scatter support.")


[docs] def mpi_entry_point(func) -> Callable: """ Return a decorator that designates a function as the "main" function for MPI. Declares that all MPI code that will be executed on the current process is contained within *func*. Calls `MPI_Init()`/`MPI_Init_thread()` and sets up a hook to call `MPI_Finalize()` on exit. """ @wraps(func) def wrapped_func(*args, **kwargs) -> None: # We enforce this so that an exception raised on one rank terminates # all ranks. if "mpi4py.run" not in sys.modules: raise RuntimeError("Must run MPI scripts via mpi4py (i.e., 'python -m " "mpi4py <args>').") # We enforce this so that we can work around certain interoperability issues, # including the hwloc/mprobe ones below. if "mpi4py.MPI" in sys.modules: raise RuntimeError("mpi4py.MPI imported before designated MPI entry " "point. Check for prior imports.") # Avoid hwloc version conflicts by forcing pocl to load before mpi4py # (don't ask). See https://github.com/illinois-ceesd/mirgecom/pull/169 # for details. import pyopencl as cl cl.get_platforms() # Avoid https://github.com/illinois-ceesd/mirgecom/issues/132 on # some MPI runtimes. This must be set *before* the first import # of mpi4py.MPI. import mpi4py mpi4py.rc.recv_mprobe = False # Runs MPI_Init()/MPI_Init_thread() and sets up a hook for MPI_Finalize() on # exit from mpi4py import MPI # noqa _check_gpu_oversubscription() _check_cache_dirs() _check_isl_version() _check_mpi4py_version() func(*args, **kwargs) return wrapped_func
[docs] def pudb_remote_debug_on_single_rank(func: Callable) -> Callable: """ Designate a function *func* to be debugged with ``pudb`` on rank 0. To use it, add this decorator to the main function that you want to debug, after the :func:`mpi_entry_point` decorator: .. code-block:: python @mpi_entry_point @pudb_remote_debug_on_single_rank def main(...) Then, you can connect to pudb on rank 0 by running ``telnet 127.0.0.1 6899`` in a separate terminal and continue to use pudb as normal. """ @wraps(func) def wrapped_func(*args: Any, **kwargs: Any) -> None: # pylint: disable=import-error from pudb.remote import debug_remote_on_single_rank from mpi4py import MPI debug_remote_on_single_rank(MPI.COMM_WORLD, 0, func, *args, **kwargs) return wrapped_func
[docs] def enable_rank_labeled_print() -> None: """Enable prepending the rank number to every message printed with print().""" def rank_print(*args, **kwargs): """Prepends the rank number to the print function.""" if "mpi4py.MPI" in sys.modules: from mpi4py import MPI out_str = f"[{MPI.COMM_WORLD.Get_rank()}]" else: out_str = "[ ]" __builtins__["oldprint"](out_str, *args, **kwargs) if "oldprint" not in __builtins__: # type: ignore[operator] __builtins__["oldprint"] = __builtins__["print"] # type: ignore[index] __builtins__["print"] = rank_print