Source code for mirgecom.profiling

"""An array context with kernel profiling capabilities."""

__copyright__ = """
Copyright (C) 2020 University of Illinois Board of Trustees
"""

__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""

from meshmode.array_context import PyOpenCLArrayContext
import pyopencl as cl
from pytools.py_codegen import PythonFunctionGenerator
import loopy as lp
import numpy as np
from dataclasses import dataclass
import pytools
from logpyle import LogManager
from mirgecom.logging_quantities import KernelProfile
from mirgecom.utils import StatisticsAccumulator

from typing import List, Dict, Optional

__doc__ = """
.. autoclass:: PyOpenCLProfilingArrayContext
.. autoclass:: SingleCallKernelProfile
.. autoclass:: MultiCallKernelProfile
"""


[docs] @dataclass class SingleCallKernelProfile: """Class to hold the results of a single kernel execution.""" time: int flops: int bytes_accessed: int footprint_bytes: Optional[int]
[docs] @dataclass class MultiCallKernelProfile: """Class to hold the results of multiple kernel executions.""" num_calls: int time: StatisticsAccumulator flops: StatisticsAccumulator bytes_accessed: StatisticsAccumulator footprint_bytes: StatisticsAccumulator
@dataclass class ProfileEvent: """Holds a profile event that has not been collected by the profiler yet.""" cl_event: cl._cl.Event translation_unit: lp.TranslationUnit args_tuple: tuple
[docs] class PyOpenCLProfilingArrayContext(PyOpenCLArrayContext): """An array context that profiles OpenCL kernel executions. .. automethod:: tabulate_profiling_data .. automethod:: call_loopy .. automethod:: get_profiling_data_for_kernel .. automethod:: reset_profiling_data_for_kernel Inherits from :class:`arraycontext.PyOpenCLArrayContext`. .. note:: Profiling of :mod:`pyopencl` kernels (that is, kernels that do not get called through :meth:`call_loopy`) is restricted to a single instance of this class. If there are multiple instances, only the first one created will be able to profile these kernels. """ def __init__(self, queue, allocator=None, logmgr: Optional[LogManager] = None) -> None: super().__init__(queue, allocator) if not queue.properties & cl.command_queue_properties.PROFILING_ENABLE: raise RuntimeError("Profiling was not enabled in the command queue. " "Please create the queue with " "cl.command_queue_properties.PROFILING_ENABLE.") # list of ProfileEvents that haven't been transferred to profiled results yet self.profile_events: List[ProfileEvent] = [] # dict of kernel name -> list of SingleCallKernelProfile results self.profile_results: Dict[str, List[SingleCallKernelProfile]] = {} # dict of (Kernel, args_tuple) -> calculated number of flops, bytes self.kernel_stats: Dict[lp.TranslationUnit, Dict[tuple, SingleCallKernelProfile]] = {} self.logmgr = logmgr # Only store the first kernel exec hook for elwise kernels if cl.array.ARRAY_KERNEL_EXEC_HOOK is None: cl.array.ARRAY_KERNEL_EXEC_HOOK = self.array_kernel_exec_hook def clone(self): """Return a semantically equivalent but distinct version of *self*.""" from warnings import warn warn("Cloned PyOpenCLProfilingArrayContexts can not " "profile elementwise PyOpenCL kernels.") return type(self)(self.queue, self.allocator, self.logmgr) def __del__(self): """Release resources and undo monkey patching.""" del self.profile_events[:] self.profile_results.clear() self.kernel_stats.clear() def array_kernel_exec_hook(self, knl, queue, gs, ls, *actual_args, wait_for): """Extract data from the elementwise array kernel.""" evt = knl(queue, gs, ls, *actual_args, wait_for=wait_for) name = knl.function_name args_tuple = tuple( (arg.size) for arg in actual_args if isinstance(arg, cl.array.Array)) try: self.kernel_stats[knl][args_tuple] except KeyError: nbytes = 0 nops = 0 for arg in actual_args: if isinstance(arg, cl.array.Array): nbytes += arg.size * arg.dtype.itemsize nops += arg.size res = SingleCallKernelProfile(time=0, flops=nops, bytes_accessed=nbytes, footprint_bytes=nbytes) self.kernel_stats.setdefault(knl, {})[args_tuple] = res if self.logmgr and f"{name}_time" not in self.logmgr.quantity_data: self.logmgr.add_quantity(KernelProfile(self, name)) self.profile_events.append(ProfileEvent(evt, knl, args_tuple)) return evt def _wait_and_transfer_profile_events(self) -> None: # First, wait for completion of all events if self.profile_events: cl.wait_for_events([pevt.cl_event for pevt in self.profile_events]) # Then, collect all events and store them for t in self.profile_events: t_unit = t.translation_unit if isinstance(t_unit, lp.TranslationUnit): name = t_unit.default_entrypoint.name else: # It's actually a cl.Kernel name = t_unit.function_name r = self._get_kernel_stats(t_unit, t.args_tuple) time = t.cl_event.profile.end - t.cl_event.profile.start new = SingleCallKernelProfile(time, r.flops, r.bytes_accessed, r.footprint_bytes) self.profile_results.setdefault(name, []).append(new) self.profile_events = []
[docs] def get_profiling_data_for_kernel(self, kernel_name: str) \ -> MultiCallKernelProfile: """Return profiling data for kernel `kernel_name`.""" self._wait_and_transfer_profile_events() time = StatisticsAccumulator(scale_factor=1e-9) gflops = StatisticsAccumulator(scale_factor=1e-9) gbytes_accessed = StatisticsAccumulator(scale_factor=1e-9) fprint_gbytes = StatisticsAccumulator(scale_factor=1e-9) num_calls = 0 if kernel_name in self.profile_results: knl_results = self.profile_results[kernel_name] num_calls = len(knl_results) for r in knl_results: time.add_value(r.time) gflops.add_value(r.flops) gbytes_accessed.add_value(r.bytes_accessed) if r.footprint_bytes is not None: fprint_gbytes.add_value(r.footprint_bytes) return MultiCallKernelProfile(num_calls, time, gflops, gbytes_accessed, fprint_gbytes)
[docs] def reset_profiling_data_for_kernel(self, kernel_name: str) -> None: """Reset profiling data for kernel `kernel_name`.""" self.profile_results.pop(kernel_name, None)
[docs] def tabulate_profiling_data(self) -> pytools.Table: """Return a :class:`pytools.Table` with the profiling results.""" self._wait_and_transfer_profile_events() tbl = pytools.Table() # Table header tbl.add_row(("Function", "Calls", "Time_sum [s]", "Time_min [s]", "Time_avg [s]", "Time_max [s]", "GFlops/s_min", "GFlops/s_avg", "GFlops/s_max", "BWAcc_min [GByte/s]", "BWAcc_mean [GByte/s]", "BWAcc_max [GByte/s]", "BWFoot_min [GByte/s]", "BWFoot_mean [GByte/s]", "BWFoot_max [GByte/s]", "Intensity (flops/byte)")) # Precision of results g = ".4g" total_calls = 0 total_time = 0.0 for knl in self.profile_results.keys(): r = self.get_profiling_data_for_kernel(knl) # Extra statistics that are derived from the main values returned by # self.get_profiling_data_for_kernel(). These are already GFlops/s and # GBytes/s respectively, so no need to scale them. flops_per_sec = StatisticsAccumulator() bandwidth_access = StatisticsAccumulator() knl_results = self.profile_results[knl] for knl_res in knl_results: flops_per_sec.add_value(knl_res.flops/knl_res.time) bandwidth_access.add_value(knl_res.bytes_accessed/knl_res.time) total_calls += r.num_calls t_sum = r.time.sum() if t_sum is not None: total_time += t_sum time_sum = f"{t_sum:{g}}" time_min = f"{r.time.min():{g}}" time_avg = f"{r.time.mean():{g}}" time_max = f"{r.time.max():{g}}" if r.footprint_bytes.sum() is not None: fprint_mean = f"{r.footprint_bytes.mean():{g}}" fprint_min = f"{r.footprint_bytes.min():{g}}" fprint_max = f"{r.footprint_bytes.max():{g}}" else: fprint_mean = "--" fprint_min = "--" fprint_max = "--" flop_sum = r.flops.sum() bytes_accessed_sum = r.bytes_accessed.sum() if (bytes_accessed_sum is not None and flop_sum is not None and flop_sum > 0): bytes_per_flop_mean = f"{bytes_accessed_sum / flop_sum:{g}}" flops_per_sec_min = f"{flops_per_sec.min():{g}}" flops_per_sec_mean = f"{flops_per_sec.mean():{g}}" flops_per_sec_max = f"{flops_per_sec.max():{g}}" else: bytes_per_flop_mean = "--" flops_per_sec_min = "--" flops_per_sec_mean = "--" flops_per_sec_max = "--" bandwidth_access_min = f"{bandwidth_access.min():{g}}" bandwidth_access_mean = f"{bandwidth_access.sum():{g}}" bandwidth_access_max = f"{bandwidth_access.max():{g}}" tbl.add_row((knl, r.num_calls, time_sum, time_min, time_avg, time_max, flops_per_sec_min, flops_per_sec_mean, flops_per_sec_max, bandwidth_access_min, bandwidth_access_mean, bandwidth_access_max, fprint_min, fprint_mean, fprint_max, bytes_per_flop_mean)) tbl.add_row(("Total", total_calls, f"{total_time:{g}}") + tuple(["--"] * 13)) return tbl
def _get_kernel_stats(self, t_unit: lp.TranslationUnit, args_tuple: tuple) \ -> SingleCallKernelProfile: return self.kernel_stats[t_unit][args_tuple] def _cache_kernel_stats(self, t_unit: lp.TranslationUnit, kwargs: dict) \ -> tuple: """Generate the kernel stats for a program with its args.""" args_tuple = tuple( (key, value.shape) if hasattr(value, "shape") else (key, value) for key, value in kwargs.items()) # Are kernel stats already in the cache? try: self.kernel_stats[t_unit][args_tuple] return args_tuple except KeyError: # If not, calculate and cache the stats ep_name = t_unit.default_entrypoint.name executor = t_unit.target.get_kernel_executor(t_unit, self.queue, entrypoint=ep_name) info = executor.translation_unit_info( ep_name, executor.arg_to_dtype_set(kwargs)) typed_t_unit = executor.get_typed_and_scheduled_translation_unit( ep_name, executor.arg_to_dtype_set(kwargs)) kernel = typed_t_unit[ep_name] idi = info.implemented_data_info param_dict = kwargs.copy() param_dict.update({k: None for k in kernel.arg_dict.keys() if k not in param_dict}) param_dict.update( {d.name: None for d in idi if d.name not in param_dict}) # Generate the wrapper code wrapper = executor.get_wrapper_generator() gen = PythonFunctionGenerator("_mcom_gen_args_profile", list(param_dict)) wrapper.generate_integer_arg_finding_from_shapes(gen, kernel, idi) wrapper.generate_integer_arg_finding_from_offsets(gen, kernel, idi) wrapper.generate_integer_arg_finding_from_strides(gen, kernel, idi) param_names = kernel.all_params() gen("return {%s}" % ", ".join( f"{repr(name)}: {name}" for name in param_names)) # Run the wrapper code, save argument values in domain_params domain_params = gen.get_picklable_function()(**param_dict) # Get flops/memory statistics op_map = lp.get_op_map(typed_t_unit, subgroup_size="guess") bytes_accessed = lp.get_mem_access_map( typed_t_unit, subgroup_size="guess") \ .to_bytes().eval_and_sum(domain_params) flops = op_map.filter_by(dtype=[np.float32, np.float64]).eval_and_sum( domain_params) # Footprint gathering is not yet available in loopy with # kernel callables: # https://github.com/inducer/loopy/issues/399 if 0: try: footprint = lp.gather_access_footprint_bytes(typed_t_unit) footprint_bytes = sum(footprint[k].eval_with_dict(domain_params) for k in footprint) except lp.symbolic.UnableToDetermineAccessRange: footprint_bytes = None else: footprint_bytes = None res = SingleCallKernelProfile( time=0, flops=flops, bytes_accessed=bytes_accessed, footprint_bytes=footprint_bytes) self.kernel_stats.setdefault(t_unit, {})[args_tuple] = res if self.logmgr: if f"{ep_name}_time" not in self.logmgr.quantity_data: self.logmgr.add_quantity(KernelProfile(self, ep_name)) return args_tuple
[docs] def call_loopy(self, t_unit, **kwargs) -> dict: """Execute the loopy kernel and profile it.""" try: t_unit = self._loopy_transform_cache[t_unit] except KeyError: orig_t_unit = t_unit t_unit = self.transform_loopy_program(t_unit) self._loopy_transform_cache[orig_t_unit] = t_unit del orig_t_unit evt, result = t_unit(self.queue, **kwargs, allocator=self.allocator) if self._wait_event_queue_length is not False: prg_name = t_unit.default_entrypoint.name wait_event_queue = self._kernel_name_to_wait_event_queue.setdefault( prg_name, []) wait_event_queue.append(evt) if len(wait_event_queue) > self._wait_event_queue_length: wait_event_queue.pop(0).wait() # Generate the stats here so we don't need to carry around the kwargs args_tuple = self._cache_kernel_stats(t_unit, kwargs) self.profile_events.append(ProfileEvent(evt, t_unit, args_tuple)) return result