r""":mod:`mirgecom.diffusion` computes the diffusion operator.
Diffusion equation:
.. math::
\frac{\partial u}{\partial t} = \nabla \cdot (\boldsymbol{\kappa} \nabla u)
where:
- conserved variable $u$
- scalar (isotropic) or diagonal-tensor (orthotropic) $\boldsymbol{\kappa}$
In the orthotropic case, the product $\boldsymbol{\kappa} \nabla u$ is treated
as a Hadamard product of arrays rather than a matrix-array multiplication.
Fully anisotropic materials are not currently handled.
Due to the possibility of big differences in the magnitude of diffusivity
coefficients, such as thermal conductivity in air and a solid, an harmonic
average can be used to increase robustness and avoid numerical instabilities.
In this case, to ensure flux continuity,
.. math::
F = \kappa^- \nabla T^- = \kappa^+ \nabla T^+
= \bar{\kappa} \left( \frac{\nabla T^- + \nabla T^+}{2} \right)
where
.. math::
\bar{\kappa} = \frac{2 \kappa^-_{ii} \kappa^+_{ii}}
{\kappa^-_{ii} + \kappa^+_{ii}}
with $\kappa_{ii}$ being either the individual components of the diffusivity
array (orthotropic material) or a single scalar (isotropic).
Flux functions
^^^^^^^^^^^^^^
.. autofunction:: diffusion_flux
.. autofunction:: grad_facial_flux_central
.. autofunction:: grad_facial_flux_weighted
.. autofunction:: diffusion_facial_flux_central
.. autofunction:: diffusion_facial_flux_harmonic
RHS Evaluation
^^^^^^^^^^^^^^
.. autofunction:: grad_operator
.. autofunction:: diffusion_operator
Boundary conditions
^^^^^^^^^^^^^^^^^^^
.. autoclass:: DiffusionBoundary
.. autoclass:: DirichletDiffusionBoundary
.. autoclass:: NeumannDiffusionBoundary
.. autoclass:: RobinDiffusionBoundary
.. autoclass:: PrescribedFluxDiffusionBoundary
.. autoclass:: DummyDiffusionBoundary
"""
__copyright__ = """
Copyright (C) 2023 University of Illinois Board of Trustees
"""
__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import abc
from functools import partial
import numpy as np
from pytools.obj_array import make_obj_array, obj_array_vectorize_n_args
from meshmode.discretization.connection import FACE_RESTR_ALL # noqa
from grudge.dof_desc import (
DD_VOLUME_ALL,
VolumeDomainTag,
DISCR_TAG_BASE,
)
from grudge.trace_pair import (
TracePair,
interior_trace_pairs,
tracepair_with_discr_tag,
)
from grudge import op
from mirgecom.math import harmonic_mean
from mirgecom.utils import normalize_boundaries
[docs]
def grad_facial_flux_central(kappa_tpair, u_tpair, normal):
r"""Compute the numerical flux for $\nabla u$.
Uses a simple average of the two sides' values:
.. math::
F = -\frac{u^- + u^+}{2} \hat{n}.
"""
return -u_tpair.avg * normal
[docs]
def grad_facial_flux_weighted(kappa_tpair, u_tpair, normal):
r"""Compute the numerical flux for $\nabla u$ using a weighted average.
Weights each side's value by the corresponding thermal conductivity $\kappa$:
.. math::
F = -\frac{\kappa^- u^- + \kappa^+ u^+}{\kappa^- + \kappa^+} \hat{n}
For an orthotropic material, uses an averaging wrt to the normal component
according to
.. math::
\kappa = n^T \cdot \kappa \cdot n
"""
actx = u_tpair.int.array_context
# If any of the coefficients are orthotropic, weight by the normal.
if isinstance(kappa_tpair.int, np.ndarray):
kappa_int = np.dot(normal, kappa_tpair.int*normal)
else:
kappa_int = kappa_tpair.int
if isinstance(kappa_tpair.ext, np.ndarray):
kappa_ext = np.dot(normal, kappa_tpair.ext*normal)
else:
kappa_ext = kappa_tpair.ext
kappa_sum = actx.np.where(
actx.np.greater(kappa_int + kappa_ext, 0*kappa_int),
kappa_int + kappa_ext,
0*kappa_int + 1)
return -(u_tpair.int*kappa_int + u_tpair.ext*kappa_ext)/kappa_sum * normal
[docs]
def diffusion_flux(kappa, grad_u):
r"""
Compute the diffusive flux $-\kappa \nabla u$.
Parameters
----------
kappa: float, numpy.ndarray or :class:`meshmode.dof_array.DOFArray`
The thermal conductivity.
grad_u: numpy.ndarray
Gradient of the state variable *u*.
Returns
-------
meshmode.dof_array.DOFArray
The diffusive flux.
"""
return -kappa * grad_u
[docs]
def diffusion_facial_flux_central(
kappa_tpair, u_tpair, grad_u_tpair, lengthscales_tpair, normal, *,
penalty_amount=None):
r"""Compute the numerical flux for $\nabla \cdot (\kappa \nabla u)$.
Uses a simple average of the two sides' values:
.. math::
F = -\frac{\kappa^- \nabla u^- + \kappa^+ \nabla u^+}{2} \cdot \hat{n}
- \tau (u^+ - u^-).
The amount of penalization $\tau$ is given by
.. math::
\tau = \frac{\alpha \bar{\kappa}_{avg}}{l},
where $\alpha$ is a user-definied value, $l$ is the element characteristic
lengthscale and $\bar{\kappa}_{avg}$ is the averaged value, considering
both isotropic (scalar) or orthotropic (array) cases. In the latter, the
normal value is used for the penalization (see [Ern_2008]_).
"""
if penalty_amount is None:
# FIXME: After verifying the form of the penalty term, figure out what value
# makes sense to use as a default here
penalty_amount = 0.05
flux_tpair = TracePair(grad_u_tpair.dd,
interior=diffusion_flux(kappa_tpair.int, grad_u_tpair.int),
exterior=diffusion_flux(kappa_tpair.ext, grad_u_tpair.ext))
flux_without_penalty = np.dot(flux_tpair.avg, normal)
# TODO: Verify that this is the correct form for the penalty term
if isinstance(kappa_tpair.avg, np.ndarray):
kappa_avg_normal = np.dot(normal, kappa_tpair.avg.int*normal)
tau = penalty_amount*kappa_avg_normal/lengthscales_tpair.avg
else:
tau = penalty_amount*kappa_tpair.avg/lengthscales_tpair.avg
return flux_without_penalty - tau*(u_tpair.ext - u_tpair.int)
[docs]
def diffusion_facial_flux_harmonic(
kappa_tpair, u_tpair, grad_u_tpair, lengthscales_tpair, normal, *,
penalty_amount=None):
r"""Compute the numerical flux for $\nabla \cdot (\kappa \nabla u)$.
Uses a modified average of the two sides' values that replaces $\kappa^-$
and $\kappa^+$ with their harmonic mean, plus a penalization term
.. math::
F = -\frac{2 \kappa_{ii}^- \kappa_{ii}^+}{\kappa_{ii}^- + \kappa_{ii}^+}
\frac{\nabla u^- + \nabla u^+}{2} \cdot \hat{n} - \tau (u^+ - u^-).
The amount of penalization $\tau$ is given by
.. math::
\tau = \frac{\alpha \bar{\kappa}_{harm}}{l},
where $\alpha$ is a user-defined value, $l$ is the element characteristic
lengthscale and $\bar{\kappa}_{harm}$ is the harmonic mean, considering
both isotropic (scalar) or orthotropic (array) cases. In the latter, the
normal value is used for the penalization (see [Ern_2008]_).
"""
if penalty_amount is None:
# FIXME: After verifying the form of the penalty term, figure out what value
# makes sense to use as a default here
penalty_amount = 0.05
kappa_harmonic_mean = harmonic_mean(kappa_tpair.int, kappa_tpair.ext)
flux_tpair = TracePair(grad_u_tpair.dd,
interior=diffusion_flux(kappa_harmonic_mean, grad_u_tpair.int),
exterior=diffusion_flux(kappa_harmonic_mean, grad_u_tpair.ext))
flux_without_penalty = np.dot(flux_tpair.avg, normal)
# TODO: Verify that this is the correct form for the penalty term
if isinstance(kappa_harmonic_mean, np.ndarray):
# if orthotropic, weight by the normal
kappa_mean_normal = np.dot(normal, kappa_harmonic_mean*normal)
tau = penalty_amount*kappa_mean_normal/lengthscales_tpair.avg
else:
tau = penalty_amount*kappa_harmonic_mean/lengthscales_tpair.avg
return flux_without_penalty - tau*(u_tpair.ext - u_tpair.int)
[docs]
class DiffusionBoundary(metaclass=abc.ABCMeta):
"""
Diffusion boundary base class.
.. automethod:: get_grad_flux
.. automethod:: get_diffusion_flux
"""
[docs]
@abc.abstractmethod
def get_grad_flux(
self, dcoll, dd_bdry, kappa_minus, u_minus, *,
numerical_flux_func=grad_facial_flux_weighted):
"""Compute the flux for grad(u) on the boundary *dd_bdry*."""
raise NotImplementedError
[docs]
@abc.abstractmethod
def get_diffusion_flux(
self, dcoll, dd_bdry, kappa_minus, u_minus, grad_u_minus,
lengthscales_minus, *, penalty_amount=None,
numerical_flux_func=diffusion_facial_flux_harmonic):
"""Compute the flux for diff(u) on the boundary *dd_bdry*."""
raise NotImplementedError
[docs]
class DirichletDiffusionBoundary(DiffusionBoundary):
r"""
Dirichlet boundary condition for the diffusion operator.
For the boundary condition $u|_\Gamma = f$, uses external data
.. math::
u^+ &= 2 f - u^-
(\nabla u)^+ &= (\nabla u)^-
to compute boundary fluxes as shown in [Hesthaven_2008]_, Section 7.1.
.. automethod:: __init__
.. automethod:: get_grad_flux
.. automethod:: get_diffusion_flux
"""
[docs]
def __init__(self, value):
"""
Initialize the boundary condition.
Parameters
----------
value: float or meshmode.dof_array.DOFArray
the value(s) of $f$ along the boundary
"""
self.value = value
[docs]
def get_grad_flux(
self, dcoll, dd_bdry, kappa_minus, u_minus, *,
numerical_flux_func=grad_facial_flux_weighted): # noqa: D102
actx = u_minus.array_context
kappa_tpair = TracePair(dd_bdry,
interior=kappa_minus,
exterior=kappa_minus)
u_tpair = TracePair(dd_bdry,
interior=u_minus,
exterior=2*self.value-u_minus)
normal = actx.thaw(dcoll.normal(dd_bdry))
return numerical_flux_func(kappa_tpair, u_tpair, normal)
[docs]
def get_diffusion_flux(
self, dcoll, dd_bdry, kappa_minus, u_minus, grad_u_minus,
lengthscales_minus, *, penalty_amount=None,
numerical_flux_func=diffusion_facial_flux_harmonic): # noqa: D102
actx = u_minus.array_context
kappa_tpair = TracePair(dd_bdry,
interior=kappa_minus,
exterior=kappa_minus)
u_tpair = TracePair(dd_bdry,
interior=u_minus,
exterior=2*self.value-u_minus)
grad_u_tpair = TracePair(dd_bdry,
interior=grad_u_minus,
exterior=grad_u_minus)
lengthscales_tpair = TracePair(
dd_bdry, interior=lengthscales_minus, exterior=lengthscales_minus)
normal = actx.thaw(dcoll.normal(dd_bdry))
return numerical_flux_func(
kappa_tpair, u_tpair, grad_u_tpair, lengthscales_tpair, normal,
penalty_amount=penalty_amount)
[docs]
class NeumannDiffusionBoundary(DiffusionBoundary):
r"""
Neumann boundary condition for the diffusion operator.
For the boundary condition $(\nabla u \cdot \mathbf{\hat{n}})|_\Gamma = g$, uses
external data
.. math::
u^+ = u^-
when computing the boundary fluxes for $\nabla u$, and uses
.. math::
(-\kappa \nabla u\cdot\mathbf{\hat{n}})|_\Gamma &=
-\kappa^- (\nabla u\cdot\mathbf{\hat{n}})|_\Gamma
&= -\kappa^- g
when computing the boundary fluxes for $\nabla \cdot (\kappa \nabla u)$.
.. automethod:: __init__
.. automethod:: get_grad_flux
.. automethod:: get_diffusion_flux
"""
[docs]
def __init__(self, value):
"""
Initialize the boundary condition.
Parameters
----------
value: float or meshmode.dof_array.DOFArray
the value(s) of $g$ along the boundary
"""
self.value = value
[docs]
def get_grad_flux(
self, dcoll, dd_bdry, kappa_minus, u_minus, *,
numerical_flux_func=grad_facial_flux_weighted): # noqa: D102
actx = u_minus.array_context
kappa_tpair = TracePair(dd_bdry,
interior=kappa_minus,
exterior=kappa_minus)
u_tpair = TracePair(dd_bdry,
interior=u_minus,
exterior=u_minus)
normal = actx.thaw(dcoll.normal(dd_bdry))
return numerical_flux_func(kappa_tpair, u_tpair, normal)
[docs]
def get_diffusion_flux(
self, dcoll, dd_bdry, kappa_minus, u_minus, grad_u_minus,
lengthscales_minus, *, penalty_amount=None,
numerical_flux_func=diffusion_facial_flux_harmonic): # noqa: D102
actx = u_minus.array_context
kappa_tpair = TracePair(dd_bdry,
interior=kappa_minus,
exterior=kappa_minus)
u_tpair = TracePair(dd_bdry,
interior=u_minus,
exterior=u_minus)
normal = actx.thaw(dcoll.normal(dd_bdry))
grad_u_tpair = TracePair(dd_bdry,
interior=grad_u_minus,
exterior=(
grad_u_minus
+ 2 * (self.value - np.dot(grad_u_minus, normal)) * normal))
lengthscales_tpair = TracePair(
dd_bdry, interior=lengthscales_minus, exterior=lengthscales_minus)
return numerical_flux_func(
kappa_tpair, u_tpair, grad_u_tpair, lengthscales_tpair, normal,
penalty_amount=penalty_amount)
[docs]
class RobinDiffusionBoundary(DiffusionBoundary):
r"""
Robin boundary condition for the diffusion operator.
The non-homogeneous Robin boundary condition is a linear combination of
$u$ and its gradient $\nabla u$, given by
.. math::
(\alpha u - \kappa \nabla u \cdot \mathbf{\hat{n}})|_\Gamma =
\alpha u_{ref}.
where $u_{ref}$ is the reference value of $u$ for $x \to \infty$ and
$\alpha$ is the weight for $u$. The gradient weight $\kappa$ is the
conductivity (thermal) or the diffusivity (species).
The current implementation uses external data
.. math::
u^+ = u^-
when computing the boundary fluxes for $\nabla u$, and
.. math::
\nabla u\cdot\mathbf{\hat{n}} |_\Gamma =
\frac{\alpha}{\kappa}(u_{ref} - u^-)
when computing the boundary fluxes for $\nabla \cdot (\kappa \nabla u)$.
.. automethod:: __init__
.. automethod:: get_grad_flux
.. automethod:: get_diffusion_flux
"""
[docs]
def __init__(self, u_ref, alpha):
"""
Initialize the boundary condition.
Parameters
----------
u_ref: float or meshmode.dof_array.DOFArray
the reference value(s) of $u$ along the boundary
alpha: float or meshmode.dof_array.DOFArray
the weight for the variable $u$ at the boundary
"""
self.u_ref = u_ref
self.alpha = alpha
[docs]
def get_grad_flux(
self, dcoll, dd_bdry, kappa_minus, u_minus, *,
numerical_flux_func=grad_facial_flux_weighted): # noqa: D102
actx = u_minus.array_context
kappa_tpair = TracePair(dd_bdry,
interior=kappa_minus,
exterior=kappa_minus)
u_tpair = TracePair(dd_bdry,
interior=u_minus,
exterior=u_minus)
normal = actx.thaw(dcoll.normal(dd_bdry))
return numerical_flux_func(kappa_tpair, u_tpair, normal)
[docs]
def get_diffusion_flux(
self, dcoll, dd_bdry, kappa_minus, u_minus, grad_u_minus,
lengthscales_minus, *, penalty_amount=None,
numerical_flux_func=diffusion_facial_flux_harmonic): # noqa: D102
actx = u_minus.array_context
kappa_tpair = TracePair(dd_bdry,
interior=kappa_minus,
exterior=kappa_minus)
u_tpair = TracePair(dd_bdry,
interior=u_minus,
exterior=u_minus)
normal = actx.thaw(dcoll.normal(dd_bdry))
dudn_bc = self.alpha * (self.u_ref - u_minus)/kappa_minus
grad_u_tpair = TracePair(dd_bdry,
interior=grad_u_minus,
exterior=(
grad_u_minus
+ 2 * (dudn_bc - np.dot(grad_u_minus, normal)) * normal))
lengthscales_tpair = TracePair(
dd_bdry, interior=lengthscales_minus, exterior=lengthscales_minus)
return numerical_flux_func(
kappa_tpair, u_tpair, grad_u_tpair, lengthscales_tpair, normal,
penalty_amount=penalty_amount)
[docs]
class PrescribedFluxDiffusionBoundary(DiffusionBoundary):
r"""
Prescribed flux boundary condition for the diffusion operator.
For the boundary condition $(\nabla u \cdot \mathbf{\hat{n}})|_\Gamma$, uses
external data
.. math::
u^+ = u^-
when computing the boundary fluxes for $\nabla u$, and applies directly
the prescribed flux $g$ when computing $\nabla \cdot (\kappa \nabla u)$:
.. math::
f_{presc} \cdot \hat{n}
.. automethod:: __init__
.. automethod:: get_grad_flux
.. automethod:: get_diffusion_flux
"""
[docs]
def __init__(self, value):
"""
Initialize the boundary condition.
Parameters
----------
value: float or meshmode.dof_array.DOFArray
the value(s) of $g$ along the boundary
"""
self.value = value
[docs]
def get_grad_flux(self, dcoll, dd_bdry, kappa_minus, u_minus, *,
numerical_flux_func=grad_facial_flux_weighted): # noqa: D102
actx = u_minus.array_context
kappa_tpair = TracePair(
dd_bdry, interior=kappa_minus, exterior=kappa_minus)
u_tpair = TracePair(
dd_bdry, interior=u_minus, exterior=u_minus)
normal = actx.thaw(dcoll.normal(dd_bdry))
return numerical_flux_func(kappa_tpair, u_tpair, normal)
[docs]
def get_diffusion_flux(
self, dcoll, dd_bdry, kappa_minus, u_minus, grad_u_minus,
lengthscales_minus, *, penalty_amount=None,
numerical_flux_func=diffusion_facial_flux_harmonic): # noqa: D102
actx = u_minus.array_context
# returns the product "flux @ normal"
return actx.np.zeros_like(u_minus) + self.value
[docs]
class DummyDiffusionBoundary(DiffusionBoundary):
"""Dummy boundary condition that duplicates the internal values."""
def get_grad_flux(self, dcoll, dd_bdry, kappa_minus, u_minus, *,
numerical_flux_func): # noqa: D102
actx = u_minus.array_context
kappa_tpair = TracePair(dd_bdry,
interior=kappa_minus,
exterior=kappa_minus)
u_tpair = TracePair(dd_bdry,
interior=u_minus,
exterior=u_minus)
normal = actx.thaw(dcoll.normal(dd_bdry))
return numerical_flux_func(kappa_tpair, u_tpair, normal)
def get_diffusion_flux(self, dcoll, dd_bdry, kappa_minus, u_minus,
grad_u_minus, lengthscales_minus, *,
numerical_flux_func=diffusion_facial_flux_harmonic,
penalty_amount=None): # noqa: D102
actx = u_minus.array_context
kappa_tpair = TracePair(dd_bdry,
interior=kappa_minus,
exterior=kappa_minus)
u_tpair = TracePair(dd_bdry,
interior=u_minus,
exterior=u_minus)
grad_u_tpair = TracePair(dd_bdry,
interior=grad_u_minus,
exterior=grad_u_minus)
lengthscales_tpair = TracePair(
dd_bdry, interior=lengthscales_minus, exterior=lengthscales_minus)
normal = actx.thaw(dcoll.normal(dd_bdry))
return numerical_flux_func(
kappa_tpair, u_tpair, grad_u_tpair, lengthscales_tpair, normal,
penalty_amount=penalty_amount)
class _DiffusionKappaTag:
pass
class _DiffusionStateTag:
pass
class _DiffusionGradTag:
pass
class _DiffusionLengthscalesTag:
pass
[docs]
def grad_operator(
dcoll, kappa, boundaries, u, *, quadrature_tag=DISCR_TAG_BASE,
dd=DD_VOLUME_ALL, comm_tag=None,
numerical_flux_func=grad_facial_flux_weighted,
# Added to avoid repeated computation
# FIXME: See if there's a better way to do this
kappa_tpairs=None,
u_tpairs=None):
r"""
Compute the gradient of *u*.
Parameters
----------
dcoll: grudge.discretization.DiscretizationCollection
the discretization collection to use
kappa: numbers.Number or meshmode.dof_array.DOFArray
the conductivity value(s)
boundaries:
dictionary (or list of dictionaries) mapping boundary tags to
:class:`DiffusionBoundary` instances
u: meshmode.dof_array.DOFArray or numpy.ndarray
the DOF array (or object array of DOF arrays) to which the operator should be
applied
numerical_flux_func:
function that computes the numerical gradient flux
quadrature_tag:
quadrature tag indicating which discretization in *dcoll* to use for
overintegration
dd: grudge.dof_desc.DOFDesc
the DOF descriptor of the discretization on which *u* lives. Must be a volume
on the base discretization.
comm_tag: Hashable
Tag for distributed communication
Returns
-------
grad_u: numpy.ndarray
the gradient of *u*
"""
if isinstance(u, np.ndarray):
if not isinstance(boundaries, list):
raise TypeError("boundaries must be a list if u is an object array")
if len(boundaries) != len(u):
raise TypeError("boundaries must be the same length as u")
return obj_array_vectorize_n_args(
lambda boundaries_i, u_i: grad_operator(
dcoll, kappa, boundaries_i, u_i, quadrature_tag=quadrature_tag,
dd=dd),
make_obj_array(boundaries), u)
actx = u.array_context
boundaries = normalize_boundaries(boundaries)
for bdtag, bdry in boundaries.items():
if not isinstance(bdry, DiffusionBoundary):
raise TypeError(f"Unrecognized boundary type for tag {bdtag}. "
"Must be an instance of DiffusionBoundary.")
if not isinstance(dd.domain_tag, VolumeDomainTag):
raise TypeError("dd must represent a volume")
if dd.discretization_tag != DISCR_TAG_BASE:
raise ValueError("dd must belong to the base discretization")
dd_vol = dd
dd_vol_quad = dd_vol.with_discr_tag(quadrature_tag)
dd_allfaces_quad = dd_vol_quad.trace(FACE_RESTR_ALL)
if kappa_tpairs is None:
kappa_tpairs = interior_trace_pairs(
dcoll, kappa, volume_dd=dd_vol, comm_tag=(_DiffusionKappaTag, comm_tag))
if u_tpairs is None:
u_tpairs = interior_trace_pairs(
dcoll, u, volume_dd=dd_vol, comm_tag=(_DiffusionStateTag, comm_tag))
interp_to_surf_quad = partial(tracepair_with_discr_tag, dcoll, quadrature_tag)
def interior_flux(kappa_tpair, u_tpair):
dd_trace_quad = kappa_tpair.dd.with_discr_tag(quadrature_tag)
kappa_tpair_quad = interp_to_surf_quad(kappa_tpair)
u_tpair_quad = interp_to_surf_quad(u_tpair)
normal_quad = actx.thaw(dcoll.normal(dd_trace_quad))
return op.project(
dcoll, dd_trace_quad, dd_allfaces_quad,
numerical_flux_func(kappa_tpair_quad, u_tpair_quad, normal_quad))
def boundary_flux(bdtag, bdry):
dd_bdry_quad = dd_vol_quad.with_domain_tag(bdtag)
kappa_minus_quad = op.project(dcoll, dd_vol, dd_bdry_quad, kappa)
u_minus_quad = op.project(dcoll, dd_vol, dd_bdry_quad, u)
return op.project(
dcoll, dd_bdry_quad, dd_allfaces_quad,
bdry.get_grad_flux(
dcoll, dd_bdry_quad, kappa_minus_quad, u_minus_quad,
numerical_flux_func=numerical_flux_func))
return op.inverse_mass(
dcoll, dd_vol,
op.weak_local_grad(dcoll, dd_vol, -u)
- # noqa: W504
op.face_mass(
dcoll, dd_allfaces_quad,
sum(
interior_flux(kappa_tpair, u_tpair)
for kappa_tpair, u_tpair in zip(kappa_tpairs, u_tpairs))
+ sum(
boundary_flux(bdtag, bdry)
for bdtag, bdry in boundaries.items())
)
)
[docs]
def diffusion_operator(
dcoll, kappa, boundaries, u, *, return_grad_u=False, penalty_amount=None,
gradient_numerical_flux_func=grad_facial_flux_weighted,
diffusion_numerical_flux_func=diffusion_facial_flux_harmonic,
quadrature_tag=DISCR_TAG_BASE, dd=DD_VOLUME_ALL, comm_tag=None,
# Added to avoid repeated computation
# FIXME: See if there's a better way to do this
grad_u=None):
r"""
Compute the diffusion operator.
The diffusion operator is defined as
$\nabla\cdot(\kappa\nabla u)$, where $\kappa$ is the conductivity and
$u$ is a scalar field.
Parameters
----------
dcoll: grudge.discretization.DiscretizationCollection
the discretization collection to use
kappa: numbers.Number or meshmode.dof_array.DOFArray or numpy.ndarray
the conductivity value(s)
boundaries:
dictionary (or list of dictionaries) mapping boundary domain tags to
:class:`DiffusionBoundary` instances
u: meshmode.dof_array.DOFArray or numpy.ndarray
the DOF array (or object array of DOF arrays) to which the operator should be
applied
return_grad_u: bool
an optional flag indicating whether $\nabla u$ should also be returned
penalty_amount: float
strength parameter for the diffusion flux interior penalty (temporary?);
the default value is 0.05
gradient_numerical_flux_func:
function that computes the numerical gradient flux
diffusion_numerical_flux_func:
function that computes the numerical diffusive flux
quadrature_tag:
quadrature tag indicating which discretization in *dcoll* to use for
overintegration
dd: grudge.dof_desc.DOFDesc
the DOF descriptor of the discretization on which *u* lives. Must be a volume
on the base discretization.
comm_tag: Hashable
Tag for distributed communication
Returns
-------
diff_u: meshmode.dof_array.DOFArray or numpy.ndarray
the diffusion operator applied to *u*
grad_u: numpy.ndarray
the gradient of *u*; only returned if *return_grad_u* is True
"""
if isinstance(u, np.ndarray):
if not isinstance(boundaries, list):
raise TypeError("boundaries must be a list if u is an object array")
if len(boundaries) != len(u):
raise TypeError("boundaries must be the same length as u")
if isinstance(kappa, np.ndarray):
if len(kappa) != len(u):
raise TypeError("kappa must be the same length as u")
else:
kappa = make_obj_array([kappa for _ in range(len(u))])
return obj_array_vectorize_n_args(
lambda kappa_i, boundaries_i, u_i: diffusion_operator(
dcoll, kappa_i, boundaries_i, u_i, return_grad_u=return_grad_u,
penalty_amount=penalty_amount,
gradient_numerical_flux_func=gradient_numerical_flux_func,
diffusion_numerical_flux_func=diffusion_numerical_flux_func,
quadrature_tag=quadrature_tag, dd=dd),
kappa, make_obj_array(boundaries), u)
actx = u.array_context
boundaries = normalize_boundaries(boundaries)
for bdtag, bdry in boundaries.items():
if not isinstance(bdry, DiffusionBoundary):
raise TypeError(f"Unrecognized boundary type for tag {bdtag}. "
"Must be an instance of DiffusionBoundary.")
if not isinstance(dd.domain_tag, VolumeDomainTag):
raise TypeError("dd must represent a volume")
if dd.discretization_tag != DISCR_TAG_BASE:
raise ValueError("dd must belong to the base discretization")
dd_vol = dd
dd_vol_quad = dd_vol.with_discr_tag(quadrature_tag)
dd_allfaces_quad = dd_vol_quad.trace(FACE_RESTR_ALL)
kappa_tpairs = interior_trace_pairs(
dcoll, kappa, volume_dd=dd_vol, comm_tag=(_DiffusionKappaTag, comm_tag))
u_tpairs = interior_trace_pairs(
dcoll, u, volume_dd=dd_vol, comm_tag=(_DiffusionStateTag, comm_tag))
if grad_u is None:
grad_u = grad_operator(
dcoll, kappa, boundaries, u,
numerical_flux_func=gradient_numerical_flux_func,
quadrature_tag=quadrature_tag, dd=dd_vol, comm_tag=comm_tag,
kappa_tpairs=kappa_tpairs, u_tpairs=u_tpairs)
grad_u_tpairs = interior_trace_pairs(
dcoll, grad_u, volume_dd=dd_vol,
comm_tag=(_DiffusionGradTag, comm_tag))
kappa_quad = op.project(dcoll, dd_vol, dd_vol_quad, kappa)
grad_u_quad = op.project(dcoll, dd_vol, dd_vol_quad, grad_u)
from grudge.dt_utils import characteristic_lengthscales
lengthscales = characteristic_lengthscales(actx, dcoll, dd=dd_vol)*(0*u+1)
lengthscales_tpairs = interior_trace_pairs(
dcoll, lengthscales, volume_dd=dd_vol,
comm_tag=(_DiffusionLengthscalesTag, comm_tag))
interp_to_surf_quad = partial(tracepair_with_discr_tag, dcoll, quadrature_tag)
def interior_flux(kappa_tpair, u_tpair, grad_u_tpair, lengthscales_tpair):
dd_trace_quad = u_tpair.dd.with_discr_tag(quadrature_tag)
u_tpair_quad = interp_to_surf_quad(u_tpair)
kappa_tpair_quad = interp_to_surf_quad(kappa_tpair)
grad_u_tpair_quad = interp_to_surf_quad(grad_u_tpair)
lengthscales_tpair_quad = interp_to_surf_quad(lengthscales_tpair)
normal_quad = actx.thaw(dcoll.normal(dd_trace_quad))
return op.project(
dcoll, dd_trace_quad, dd_allfaces_quad,
diffusion_numerical_flux_func(
kappa_tpair_quad, u_tpair_quad, grad_u_tpair_quad,
lengthscales_tpair_quad, normal_quad,
penalty_amount=penalty_amount))
def boundary_flux(bdtag, bdry):
dd_bdry_quad = dd_vol_quad.with_domain_tag(bdtag)
u_minus_quad = op.project(dcoll, dd_vol, dd_bdry_quad, u)
kappa_minus_quad = op.project(dcoll, dd_vol, dd_bdry_quad, kappa)
grad_u_minus_quad = op.project(dcoll, dd_vol, dd_bdry_quad, grad_u)
lengthscales_minus_quad = op.project(
dcoll, dd_vol, dd_bdry_quad, lengthscales)
return op.project(
dcoll, dd_bdry_quad, dd_allfaces_quad,
bdry.get_diffusion_flux(
dcoll, dd_bdry_quad, kappa_minus_quad, u_minus_quad,
grad_u_minus_quad, lengthscales_minus_quad,
penalty_amount=penalty_amount,
numerical_flux_func=diffusion_numerical_flux_func))
diff_u = op.inverse_mass(
dcoll, dd_vol,
op.weak_local_div(dcoll, dd_vol_quad, -kappa_quad*grad_u_quad)
- # noqa: W504
op.face_mass(
dcoll, dd_allfaces_quad,
sum(
interior_flux(kappa_tpair, u_tpair, grad_u_tpair, lengthscales_tpair)
for kappa_tpair, u_tpair, grad_u_tpair, lengthscales_tpair in zip(
kappa_tpairs, u_tpairs, grad_u_tpairs, lengthscales_tpairs))
+ sum(
boundary_flux(bdtag, bdry)
for bdtag, bdry in boundaries.items())
)
)
if return_grad_u:
return diff_u, grad_u
else:
return diff_u