"""
General utilities.
"""
import functools
import dask
import numpy as np
import pandas as pd
from scipy.spatial import cKDTree # pylint: disable=no-name-in-module
try:
from pykdtree.kdtree import KDTree as pyKDTree
except ImportError:
pyKDTree = None
try:
import numba
except ImportError:
numba = None
from .base.utils import check_data, n_1d_arrays
def dispatch(function, delayed=False, client=None):
"""
Decide how to wrap a function for Dask depending on the options given.
Parameters
----------
function : callable
The function that will be called.
delayed : bool
If True, will wrap the function in :func:`dask.delayed`.
client : None or dask.distributed Client
If *delayed* is False and *client* is not None, will return a partial
execution of the ``client.submit`` with the function as first argument.
Returns
-------
function : callable
The function wrapped in Dask.
"""
if delayed:
return dask.delayed(function)
if client is not None:
return functools.partial(client.submit, function)
return function
def parse_engine(engine):
"""
Choose the best engine available and check if it's valid.
Parameters
----------
engine : str
The name of the engine. If ``"auto"`` will favor numba if it's
available.
Returns
-------
engine : str
The name of the engine that should be used.
"""
engines = {"auto", "numba", "numpy"}
if engine not in engines:
raise ValueError("Invalid engine '{}'. Must be in {}.".format(engine, engines))
if engine == "auto":
if numba is None:
return "numpy"
return "numba"
return engine
def dummy_jit(**kwargs): # pylint: disable=unused-argument
"""
Replace numba.jit if not installed with a function that raises RunTimeError
Use as a decorator.
Parameters
----------
function
A function that you would decorate with :func:`numba.jit`.
Returns
-------
function
A function that raises :class:`RunTimeError` warning that numba isn't
installed.
"""
def dummy_decorator(function):
"The actual decorator"
@functools.wraps(function)
def dummy_function(*args, **kwargs): # pylint: disable=unused-argument
"Just raise an exception."
raise RuntimeError("Could not find numba.")
return dummy_function
return dummy_decorator
[docs]def variance_to_weights(variance, tol=1e-15, dtype="float64"):
"""
Converts data variances to weights for gridding.
Weights are defined as the inverse of the variance, scaled to the range
[0, 1], i.e. ``variance.min()/variance``.
Any variance that is smaller than *tol* will automatically receive a weight
of 1 to avoid zero division or blown up weights.
Parameters
----------
variance : array or tuple of arrays
An array with the variance of each point. If there are multiple arrays
in a tuple, will calculated weights for each of them separately. Can
have NaNs but they will be converted to zeros and therefore receive a
weight of 1.
tol : float
The tolerance, or cutoff threshold, for small variances.
dtype : str or numpy dtype
The type of the output weights array.
Returns
-------
weights : array or tuple of arrays
Data weights in the range [0, 1] with the same shape as *variance*. If
more than one variance array was provided, then this will be a tuple
with the weights corresponding to each variance array.
Examples
--------
>>> print(variance_to_weights([0, 2, 0.2, 1e-16]))
[1. 0.1 1. 1. ]
>>> print(variance_to_weights([0, 0, 0, 0]))
[1. 1. 1. 1.]
>>> for w in variance_to_weights(([0, 1, 10], [2, 4.0, 8])):
... print(w)
[1. 1. 0.1]
[1. 0.5 0.25]
"""
variance = check_data(variance)
weights = []
for var in variance:
var = np.nan_to_num(np.atleast_1d(var), copy=False)
w = np.ones_like(var, dtype=dtype)
nonzero = var > tol
if np.any(nonzero):
nonzero_var = var[nonzero]
w[nonzero] = nonzero_var.min() / nonzero_var
weights.append(w)
if len(weights) == 1:
return weights[0]
return tuple(weights)
[docs]def maxabs(*args, nan=True):
"""
Calculate the maximum absolute value of the given array(s).
Use this to set the limits of your colorbars and center them on zero.
Parameters
----------
args
One or more arrays. If more than one are given, a single maximum will
be calculated across all arrays.
Returns
-------
maxabs : float
The maximum absolute value across all arrays.
Examples
--------
>>> maxabs((1, -10, 25, 2, 3))
25
>>> maxabs((1, -10.5, 25, 2), (0.1, 100, -500), (-200, -300, -0.1, -499))
500.0
If the array contains NaNs, we'll use the ``nan`` version of of the numpy
functions by default. You can turn this off through the *nan* argument.
>>> import numpy as np
>>> maxabs((1, -10, 25, 2, 3, np.nan))
25.0
>>> maxabs((1, -10, 25, 2, 3, np.nan), nan=False)
nan
"""
arrays = [np.atleast_1d(i) for i in args]
if nan:
npmin, npmax = np.nanmin, np.nanmax
else:
npmin, npmax = np.min, np.max
absolute = [npmax(np.abs([npmin(i), npmax(i)])) for i in arrays]
return npmax(absolute)
[docs]def grid_to_table(grid):
"""
Convert a grid to a table with the values and coordinates of each point.
Takes a 2D grid as input, extracts the coordinates and runs them through
:func:`numpy.meshgrid` to create a 2D table. Works for 2D grids and any
number of variables. Use cases includes passing gridded data to functions
that expect data in XYZ format, such as :class:`verde.BlockReduce`
Parameters
----------
grid : :class:`xarray.Dataset` or :class:`xarray.DataArray`
A 2D grid with one or more data variables.
Returns
-------
table : :class:`pandas.DataFrame`
Table with coordinates and variable values for each point in the grid.
Column names are taken from the grid. If *grid* is a
:class:`xarray.DataArray` that doesn't have a ``name`` attribute
defined, the column with data values will be called ``"scalars"``.
Examples
--------
>>> import xarray as xr
>>> import numpy as np
>>> # Create a sample grid with a single data variable
>>> temperature = xr.DataArray(
... np.arange(20).reshape((4, 5)),
... coords=(np.arange(4), np.arange(5, 10)),
... dims=['northing', 'easting']
... )
>>> print(temperature.values)
[[ 0 1 2 3 4]
[ 5 6 7 8 9]
[10 11 12 13 14]
[15 16 17 18 19]]
>>> # For DataArrays, the data column will be "scalars" by default
>>> table = grid_to_table(temperature)
>>> list(sorted(table.columns))
['easting', 'northing', 'scalars']
>>> print(table.scalars.values)
[ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19]
>>> print(table.northing.values)
[0 0 0 0 0 1 1 1 1 1 2 2 2 2 2 3 3 3 3 3]
>>> print(table.easting.values)
[5 6 7 8 9 5 6 7 8 9 5 6 7 8 9 5 6 7 8 9]
>>> # If the DataArray defines a "name", we will use that instead
>>> temperature.name = "temperature_K"
>>> table = grid_to_table(temperature)
>>> list(sorted(table.columns))
['easting', 'northing', 'temperature_K']
>>> # Conversion of Datasets will preserve the data variable names
>>> grid = xr.Dataset({"temperature": temperature})
>>> table = grid_to_table(grid)
>>> list(sorted(table.columns))
['easting', 'northing', 'temperature']
>>> print(table.temperature.values)
[ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19]
>>> print(table.northing.values)
[0 0 0 0 0 1 1 1 1 1 2 2 2 2 2 3 3 3 3 3]
>>> print(table.easting.values)
[5 6 7 8 9 5 6 7 8 9 5 6 7 8 9 5 6 7 8 9]
>>> # Grids with multiple data variables will have more columns.
>>> wind_speed = xr.DataArray(
... np.arange(20, 40).reshape((4, 5)),
... coords=(np.arange(4), np.arange(5, 10)),
... dims=['northing', 'easting']
... )
>>> grid['wind_speed'] = wind_speed
>>> table = grid_to_table(grid)
>>> list(sorted(table.columns))
['easting', 'northing', 'temperature', 'wind_speed']
>>> print(table.northing.values)
[0 0 0 0 0 1 1 1 1 1 2 2 2 2 2 3 3 3 3 3]
>>> print(table.easting.values)
[5 6 7 8 9 5 6 7 8 9 5 6 7 8 9 5 6 7 8 9]
>>> print(table.temperature.values)
[ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19]
>>> print(table.wind_speed.values)
[20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39]
"""
if hasattr(grid, "data_vars"):
# It's a Dataset
data_names = list(grid.data_vars.keys())
data_arrays = [grid[name].values.ravel() for name in data_names]
coordinate_names = list(grid[data_names[0]].dims)
else:
# It's a DataArray
data_names = [grid.name if grid.name is not None else "scalars"]
data_arrays = [grid.values.ravel()]
coordinate_names = list(grid.dims)
north = grid.coords[coordinate_names[0]].values
east = grid.coords[coordinate_names[1]].values
# Need to flip the coordinates because the names are in northing and
# easting order
coordinates = [i.ravel() for i in np.meshgrid(east, north)][::-1]
data_dict = dict(zip(coordinate_names, coordinates))
data_dict.update(dict(zip(data_names, data_arrays)))
return pd.DataFrame(data_dict)
def kdtree(coordinates, use_pykdtree=True, **kwargs):
"""
Create a KD-Tree object with the given coordinate arrays.
Automatically transposes and flattens the coordinate arrays into a single
matrix for use in the KD-Tree classes.
All other keyword arguments are passed to the KD-Tree class.
If installed, package ``pykdtree`` will be used instead of
:class:`scipy.spatial.cKDTree` for better performance. Not all features are
available in ``pykdtree`` so if you require the scipy version set
``use_pykdtee=False``.
Parameters
----------
coordinates : tuple of arrays
Arrays with the coordinates of each data point. Should be in the
following order: (easting, northing, vertical, ...). All coordinate
arrays are used.
use_pykdtree : bool
If True, will prefer ``pykdtree`` (if installed) over
:class:`scipy.spatial.cKDTree`. Otherwise, always use the scipy
version.
Returns
-------
tree : :class:`scipy.spatial.cKDTree` or ``pykdtree.kdtree.KDTree``
The tree instance initialized with the given coordinates and arguments.
"""
points = np.transpose(n_1d_arrays(coordinates, len(coordinates)))
if pyKDTree is not None and use_pykdtree:
tree = pyKDTree(points, **kwargs)
else:
tree = cKDTree(points, **kwargs)
return tree
def partition_by_sum(array, parts):
"""
Partition an array into parts of approximately equal sum.
Does not change the order of the array elements.
Produces the partition indices on the array. Use :func:`numpy.split` to
divide the array along these indices.
.. warning::
Depending on the input and number of parts, there might not exist
partition points. In these cases, the function will raise
``ValueError``. This is more likely to happen as the number of parts
approaches the number of elements in the array.
Parameters
----------
array : array or array-like
The 1D array that will be partitioned. The array will be raveled before
computations.
parts : int
Number of parts to split the array. Can be at most the number of
elements in the array.
Returns
-------
indices : array
The indices in which the array should be split.
Notes
-----
Solution from https://stackoverflow.com/a/54024280
Examples
--------
>>> import numpy as np
>>> array = np.arange(10)
>>> split_points = partition_by_sum(array, parts=2)
>>> print(split_points)
[7]
>>> for part in np.split(array, split_points):
... print(part, part.sum())
[0 1 2 3 4 5 6] 21
[7 8 9] 24
>>> split_points = partition_by_sum(array, parts=3)
>>> print(split_points)
[6 8]
>>> for part in np.split(array, split_points):
... print(part, part.sum())
[0 1 2 3 4 5] 15
[6 7] 13
[8 9] 17
>>> split_points = partition_by_sum(array, parts=5)
>>> print(split_points)
[4 6 7 9]
>>> for part in np.split(array, split_points):
... print(part, part.sum())
[0 1 2 3] 6
[4 5] 9
[6] 6
[7 8] 15
[9] 9
>>> # Use an array with a random looking element order
>>> array = [5, 6, 4, 6, 8, 1, 2, 6, 3, 3]
>>> split_points = partition_by_sum(array, parts=2)
>>> print(split_points)
[4]
>>> for part in np.split(array, split_points):
... print(part, part.sum())
[5 6 4 6] 21
[8 1 2 6 3 3] 23
>>> # Splits can have very different sums but this is best that can be done
>>> # without changing the order of the array.
>>> split_points = partition_by_sum(array, parts=5)
>>> print(split_points)
[1 3 4 7]
>>> for part in np.split(array, split_points):
... print(part, part.sum())
[5] 5
[6 4] 10
[6] 6
[8 1 2] 11
[6 3 3] 12
"""
array = np.atleast_1d(array).ravel()
if parts > array.size:
raise ValueError(
"Cannot partition an array of size {} into {} parts of equal sum.".format(
array.size, parts
)
)
cumulative_sum = array.cumsum()
# Ideally, we want each part to have the same number of points (total /
# parts).
ideal_sum = cumulative_sum[-1] // parts
# If the parts are ideal, the cumulative sum of each part will be this
ideal_cumsum = np.arange(1, parts) * ideal_sum
# Find the places in the real cumulative sum where the ideal values would
# be. These are the split points. Between each split point, the sum of
# elements will be approximately the ideal sum. Need to insert to the right
# side so that we find cumsum[i - 1] <= ideal < cumsum[i]. This way, if a
# part has ideal sum, the last element (i - 1) will be included. Otherwise,
# we would never have ideal sums.
indices = np.searchsorted(cumulative_sum, ideal_cumsum, side="right")
# Check for repeated split points, which indicates that there is no way to
# split the array.
if np.unique(indices).size != indices.size:
raise ValueError(
"Could not find partition points to split the array into {} parts "
"of equal sum.".format(parts)
)
return indices