# Source code for verde.spline

```
"""
Biharmonic splines in 2D.
"""
import itertools
from warnings import warn
import numpy as np
from sklearn.utils.validation import check_is_fitted
from .base import n_1d_arrays, BaseGridder, check_fit_input, least_squares
from .coordinates import get_region
from .utils import DummyClient, parse_engine
from .model_selection import cross_val_score
try:
import numba
from numba import jit
except ImportError:
numba = None
from .utils import dummy_jit as jit
# Default arguments for numba.jit
JIT_ARGS = dict(nopython=True, target="cpu", fastmath=True, parallel=True)
[docs]class SplineCV(BaseGridder):
r"""
Cross-validated biharmonic spline interpolation.
Similar to :class:`verde.Spline` but automatically chooses the best *damping* and
*mindist* parameters using cross-validation. Tests all combinations of the given
*dampings* and *mindists* and selects the maximum (or minimum) mean cross-validation
score (i.e., a grid search).
The grid search can optionally run in parallel using :mod:`dask`. To do this, pass
in a :class:`dask.distributed.Client` as the *client* argument. The client can
manage a process or thread pool in your local computer or a remote cluster.
Other cross-validation generators from :mod:`sklearn.model_selection` can be used by
passing them through the *cv* argument.
Parameters
----------
mindists : iterable or 1d array
List (or other iterable) of *mindist* parameter values to try. Can be considered
a minimum distance between the point forces and data points. Needed because the
Green's functions are singular when forces and data points coincide. Acts as a
fudge factor.
dampings : iterable or 1d array
List (or other iterable) of *damping* parameter values to try. Is the positive
damping regularization parameter. Controls how much smoothness is imposed on the
estimated forces. If None, no regularization is used.
force_coords : None or tuple of arrays
The easting and northing coordinates of the point forces. If None (default),
then will be set to the data coordinates the first time
:meth:`~verde.SplineCV.fit` is called.
engine : str
Computation engine for the Jacobian matrix and prediction. Can be ``'auto'``,
``'numba'``, or ``'numpy'``. If ``'auto'``, will use numba if it is installed or
numpy otherwise. The numba version is multi-threaded and usually faster, which
makes fitting and predicting faster.
cv : None or cross-validation generator
Any scikit-learn cross-validation generator. If not given, will use the default
set by :func:`verde.cross_val_score`.
client : None or dask.distributed.Client
If None, then computations are run serially. Otherwise, should be a
dask ``Client`` object. It will be used to dispatch computations to the
dask cluster.
Attributes
----------
force_ : array
The estimated forces that fit the observed data.
force_coords_ : tuple of arrays
The easting and northing coordinates of the point forces. Same as *force_coords*
if it is not None. Otherwise, same as the data locations used to fit the spline.
region_ : tuple
The boundaries (``[W, E, S, N]``) of the data used to fit the
interpolator. Used as the default region for the
:meth:`~verde.SplineCV.grid` and :meth:`~verde.SplineCV.scatter` methods.
scores_ : array
The mean cross-validation score for each parameter combination.
mindist_ : float
The optimal value for the *mindist* parameter.
damping_ : float
The optimal value for the *damping* parameter.
See also
--------
Spline : The bi-harmonic spline
cross_val_score : Score an estimator/gridder using cross-validation
"""
def __init__(
self,
mindists=(1e3, 10e3, 100e3),
dampings=(1e-10, 1e-5, 1e-1),
force_coords=None,
engine="auto",
cv=None,
client=None,
):
super().__init__()
self.dampings = dampings
self.mindists = mindists
self.force_coords = force_coords
self.engine = engine
self.cv = cv
self.client = client
[docs] def fit(self, coordinates, data, weights=None):
"""
Fit the biharmonic spline to the given data and automatically tune parameters.
For each combination of the parameters given, computes the mean cross validation
score using :func:`verde.cross_val_score` and the given CV splitting class (the
*cv* parameter of this class). The configuration with the best score is then
chosen and used to fit the entire dataset.
The data region is captured and used as default for the
:meth:`~verde.SplineCV.grid` and :meth:`~verde.SplineCV.scatter` methods.
All input arrays must have the same shape.
Parameters
----------
coordinates : tuple of arrays
Arrays with the coordinates of each data point. Should be in the
following order: (easting, northing, vertical, ...). Only easting
and northing will be used, all subsequent coordinates will be
ignored.
data : array
The data values of each data point.
weights : None or array
If not None, then the weights assigned to each data point.
Typically, this should be 1 over the data uncertainty squared.
Returns
-------
self
Returns this estimator instance for chaining operations.
"""
if self.client is None:
client = DummyClient()
else:
client = self.client
# If this is a parallel client, scattering the data makes sure each worker has a
# copy of the data so we minimize transfer.
futures = tuple(client.scatter(i) for i in (coordinates, data, weights))
parameter_sets = [
dict(
mindist=combo[0],
damping=combo[1],
engine=self.engine,
force_coords=self.force_coords,
)
for combo in itertools.product(self.mindists, self.dampings)
]
scores = []
for params in parameter_sets:
spline = Spline(**params)
scores.append(
client.submit(
cross_val_score,
spline,
coordinates=futures[0],
data=futures[1],
weights=futures[2],
cv=self.cv,
)
)
if self.client is not None:
scores = [score.result() for score in scores]
self.scores_ = np.mean(scores, axis=1)
best = np.argmax(self.scores_)
self._best = Spline(**parameter_sets[best])
self._best.fit(coordinates, data, weights=weights)
return self
@property
def force_(self):
"The estimated forces that fit the data."
return self._best.force_
@property
def region_(self):
"The bounding region of the data used to fit the spline"
return self._best.region_
@property
def damping_(self):
"The optimal damping parameter"
return self._best.damping
@property
def mindist_(self):
"The optimal mindist parameter"
return self._best.mindist
@property
def force_coords_(self):
"The optimal force locations"
return self._best.force_coords_
[docs] def predict(self, coordinates):
"""
Evaluate the best estimated spline on the given set of points.
Requires a fitted estimator (see :meth:`~verde.SplineCV.fit`).
Parameters
----------
coordinates : tuple of arrays
Arrays with the coordinates of each data point. Should be in the
following order: (easting, northing, vertical, ...). Only easting
and northing will be used, all subsequent coordinates will be
ignored.
Returns
-------
data : array
The data values evaluated on the given points.
"""
check_is_fitted(self, ["_best"])
return self._best.predict(coordinates)
[docs]class Spline(BaseGridder):
r"""
Biharmonic spline interpolation using Green's functions.
This gridder assumes Cartesian coordinates.
Implements the 2D splines of [Sandwell1987]_. The Green's function for the spline
corresponds to the elastic deflection of a thin sheet subject to a vertical force.
For an observation point at the origin and a force at the coordinates given by the
vector :math:`\mathbf{x}`, the Green's function is:
.. math::
g(\mathbf{x}) = \|\mathbf{x}\|^2 \left(\log \|\mathbf{x}\| - 1\right)
In practice, this function is not defined for data points that coincide with a
force. To prevent this, a fudge factor is added to :math:`\|\mathbf{x}\|`.
The interpolation is performed by estimating forces that produce deflections that
fit the observed data (using least-squares). Then, the interpolated points can be
evaluated at any location.
By default, the forces will be placed at the same points as the input data given to
:meth:`~verde.Spline.fit`. This configuration provides an exact solution on top of
the data points. However, this solution can be unstable for certain configurations
of data points.
Approximate (and more stable) solutions can be obtained by applying damping
regularization to smooth the estimated forces (and interpolated values) or by not
using the data coordinates to position the forces (use the *force_coords*
parameter).
Data weights can be used during fitting but only have an any effect when using the
approximate solutions.
Before fitting, the Jacobian (design, sensitivity, feature, etc) matrix for the
spline is normalized using :class:`sklearn.preprocessing.StandardScaler` without
centering the mean so that the transformation can be undone in the estimated forces.
Parameters
----------
mindist : float
A minimum distance between the point forces and data points. Needed because the
Green's functions are singular when forces and data points coincide. Acts as a
fudge factor.
damping : None or float
The positive damping regularization parameter. Controls how much smoothness is
imposed on the estimated forces. If None, no regularization is used.
force_coords : None or tuple of arrays
The easting and northing coordinates of the point forces. If None (default),
then will be set to the data coordinates used to fit the spline.
engine : str
Computation engine for the Jacobian matrix and prediction. Can be ``'auto'``,
``'numba'``, or ``'numpy'``. If ``'auto'``, will use numba if it is installed or
numpy otherwise. The numba version is multi-threaded and usually faster, which
makes fitting and predicting faster.
Attributes
----------
force_ : array
The estimated forces that fit the observed data.
force_coords_ : tuple of arrays
The easting and northing coordinates of the point forces. Same as *force_coords*
if it is not None. Otherwise, same as the data locations used to fit the spline.
region_ : tuple
The boundaries (``[W, E, S, N]``) of the data used to fit the
interpolator. Used as the default region for the
:meth:`~verde.Spline.grid` and :meth:`~verde.Spline.scatter` methods.
See also
--------
SplineCV : Cross-validated version of the bi-harmonic spline
"""
def __init__(self, mindist=1e-5, damping=None, force_coords=None, engine="auto"):
super().__init__()
self.mindist = mindist
self.damping = damping
self.force_coords = force_coords
self.engine = engine
[docs] def fit(self, coordinates, data, weights=None):
"""
Fit the biharmonic spline to the given data.
The data region is captured and used as default for the
:meth:`~verde.Spline.grid` and :meth:`~verde.Spline.scatter` methods.
All input arrays must have the same shape.
Parameters
----------
coordinates : tuple of arrays
Arrays with the coordinates of each data point. Should be in the
following order: (easting, northing, vertical, ...). Only easting
and northing will be used, all subsequent coordinates will be
ignored.
data : array
The data values of each data point.
weights : None or array
If not None, then the weights assigned to each data point.
Typically, this should be 1 over the data uncertainty squared.
Returns
-------
self
Returns this estimator instance for chaining operations.
"""
coordinates, data, weights = check_fit_input(coordinates, data, weights)
warn_weighted_exact_solution(self, weights)
# Capture the data region to use as a default when gridding.
self.region_ = get_region(coordinates[:2])
if self.force_coords is None:
self.force_coords_ = tuple(i.copy() for i in n_1d_arrays(coordinates, n=2))
else:
self.force_coords_ = self.force_coords
jacobian = self.jacobian(coordinates[:2], self.force_coords_)
self.force_ = least_squares(jacobian, data, weights, self.damping)
return self
[docs] def predict(self, coordinates):
"""
Evaluate the estimated spline on the given set of points.
Requires a fitted estimator (see :meth:`~verde.Spline.fit`).
Parameters
----------
coordinates : tuple of arrays
Arrays with the coordinates of each data point. Should be in the
following order: (easting, northing, vertical, ...). Only easting
and northing will be used, all subsequent coordinates will be
ignored.
Returns
-------
data : array
The data values evaluated on the given points.
"""
check_is_fitted(self, ["force_"])
shape = np.broadcast(*coordinates[:2]).shape
force_east, force_north = n_1d_arrays(self.force_coords_, n=2)
east, north = n_1d_arrays(coordinates, n=2)
data = np.empty(east.size, dtype=east.dtype)
if parse_engine(self.engine) == "numba":
data = predict_numba(
east, north, force_east, force_north, self.mindist, self.force_, data
)
else:
data = predict_numpy(
east, north, force_east, force_north, self.mindist, self.force_, data
)
return data.reshape(shape)
[docs] def jacobian(self, coordinates, force_coords, dtype="float64"):
"""
Make the Jacobian matrix for the 2D biharmonic spline.
Each column of the Jacobian is the Green's function for a single force evaluated
on all observation points [Sandwell1987]_.
Parameters
----------
coordinates : tuple of arrays
Arrays with the coordinates of each data point. Should be in the
following order: (easting, northing, vertical, ...). Only easting and
northing will be used, all subsequent coordinates will be ignored.
force_coords : tuple of arrays
Arrays with the coordinates for the forces. Should be in the same order as
the coordinate arrays.
dtype : str or numpy dtype
The type of the Jacobian array.
Returns
-------
jacobian : 2D array
The (n_data, n_forces) Jacobian matrix.
"""
force_east, force_north = n_1d_arrays(force_coords, n=2)
east, north = n_1d_arrays(coordinates, n=2)
jac = np.empty((east.size, force_east.size), dtype=dtype)
if parse_engine(self.engine) == "numba":
jac = jacobian_numba(
east, north, force_east, force_north, self.mindist, jac
)
else:
jac = jacobian_numpy(
east, north, force_east, force_north, self.mindist, jac
)
return jac
def warn_weighted_exact_solution(spline, weights):
"""
Warn the user that a weights doesn't work for the exact solution.
Parameters
----------
spline : estimator
The spline instance that we'll check. Needs to have the ``damping`` attribute.
weights : array or None
The weights given to fit.
"""
# Check if we're using weights without damping and warn the user that it might not
# have any effect.
if weights is not None and spline.damping is None:
warn(
"Weights might have no effect if no regularization is used. "
"Use damping or specify force positions that are different from the data."
)
def greens_func(east, north, mindist):
"Calculate the Green's function for the Bi-Harmonic Spline"
distance = np.sqrt(east ** 2 + north ** 2)
# The mindist factor helps avoid singular matrices when the force and
# computation point are too close
distance += mindist
return (distance ** 2) * (np.log(distance) - 1)
def predict_numpy(east, north, force_east, force_north, mindist, forces, result):
"Calculate the predicted data using numpy."
result[:] = 0
for j in range(forces.size):
green = greens_func(east - force_east[j], north - force_north[j], mindist)
result += green * forces[j]
return result
def jacobian_numpy(east, north, force_east, force_north, mindist, jac):
"Calculate the Jacobian using numpy broadcasting."
# Reshaping the data to a column vector will automatically build a distance matrix
# between each data point and force.
jac[:] = greens_func(
east.reshape((east.size, 1)) - force_east,
north.reshape((north.size, 1)) - force_north,
mindist,
)
return jac
@jit(**JIT_ARGS)
def predict_numba(east, north, force_east, force_north, mindist, forces, result):
"Calculate the predicted data using numba to speed things up."
for i in numba.prange(east.size): # pylint: disable=not-an-iterable
result[i] = 0
for j in range(forces.size):
green = GREENS_FUNC_JIT(
east[i] - force_east[j], north[i] - force_north[j], mindist
)
result[i] += green * forces[j]
return result
@jit(**JIT_ARGS)
def jacobian_numba(east, north, force_east, force_north, mindist, jac):
"Calculate the Jacobian matrix using numba to speed things up."
for i in range(east.size):
for j in range(force_east.size):
jac[i, j] = GREENS_FUNC_JIT(
east[i] - force_east[j], north[i] - force_north[j], mindist
)
return jac
# Jit compile the Green's functions for use in the numba functions
GREENS_FUNC_JIT = jit(**JIT_ARGS)(greens_func)
```